+++ /dev/null
-# Copyright (C) 2003 Python Software Foundation\r
-\r
-import unittest\r
-import shutil\r
-import tempfile\r
-import sys\r
-import stat\r
-import os\r
-import os.path\r
-from os.path import splitdrive\r
-from distutils.spawn import find_executable, spawn\r
-from shutil import (_make_tarball, _make_zipfile, make_archive,\r
- register_archive_format, unregister_archive_format,\r
- get_archive_formats)\r
-import tarfile\r
-import warnings\r
-\r
-from test import test_support\r
-from test.test_support import TESTFN, check_warnings, captured_stdout\r
-\r
-TESTFN2 = TESTFN + "2"\r
-\r
-try:\r
- import grp\r
- import pwd\r
- UID_GID_SUPPORT = True\r
-except ImportError:\r
- UID_GID_SUPPORT = False\r
-\r
-try:\r
- import zlib\r
-except ImportError:\r
- zlib = None\r
-\r
-try:\r
- import zipfile\r
- ZIP_SUPPORT = True\r
-except ImportError:\r
- ZIP_SUPPORT = find_executable('zip')\r
-\r
-class TestShutil(unittest.TestCase):\r
-\r
- def setUp(self):\r
- super(TestShutil, self).setUp()\r
- self.tempdirs = []\r
-\r
- def tearDown(self):\r
- super(TestShutil, self).tearDown()\r
- while self.tempdirs:\r
- d = self.tempdirs.pop()\r
- shutil.rmtree(d, os.name in ('nt', 'cygwin'))\r
-\r
- def write_file(self, path, content='xxx'):\r
- """Writes a file in the given path.\r
-\r
-\r
- path can be a string or a sequence.\r
- """\r
- if isinstance(path, (list, tuple)):\r
- path = os.path.join(*path)\r
- f = open(path, 'w')\r
- try:\r
- f.write(content)\r
- finally:\r
- f.close()\r
-\r
- def mkdtemp(self):\r
- """Create a temporary directory that will be cleaned up.\r
-\r
- Returns the path of the directory.\r
- """\r
- d = tempfile.mkdtemp()\r
- self.tempdirs.append(d)\r
- return d\r
- def test_rmtree_errors(self):\r
- # filename is guaranteed not to exist\r
- filename = tempfile.mktemp()\r
- self.assertRaises(OSError, shutil.rmtree, filename)\r
-\r
- # See bug #1071513 for why we don't run this on cygwin\r
- # and bug #1076467 for why we don't run this as root.\r
- if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'\r
- and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):\r
- def test_on_error(self):\r
- self.errorState = 0\r
- os.mkdir(TESTFN)\r
- self.childpath = os.path.join(TESTFN, 'a')\r
- f = open(self.childpath, 'w')\r
- f.close()\r
- old_dir_mode = os.stat(TESTFN).st_mode\r
- old_child_mode = os.stat(self.childpath).st_mode\r
- # Make unwritable.\r
- os.chmod(self.childpath, stat.S_IREAD)\r
- os.chmod(TESTFN, stat.S_IREAD)\r
-\r
- shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)\r
- # Test whether onerror has actually been called.\r
- self.assertEqual(self.errorState, 2,\r
- "Expected call to onerror function did not happen.")\r
-\r
- # Make writable again.\r
- os.chmod(TESTFN, old_dir_mode)\r
- os.chmod(self.childpath, old_child_mode)\r
-\r
- # Clean up.\r
- shutil.rmtree(TESTFN)\r
-\r
- def check_args_to_onerror(self, func, arg, exc):\r
- # test_rmtree_errors deliberately runs rmtree\r
- # on a directory that is chmod 400, which will fail.\r
- # This function is run when shutil.rmtree fails.\r
- # 99.9% of the time it initially fails to remove\r
- # a file in the directory, so the first time through\r
- # func is os.remove.\r
- # However, some Linux machines running ZFS on\r
- # FUSE experienced a failure earlier in the process\r
- # at os.listdir. The first failure may legally\r
- # be either.\r
- if self.errorState == 0:\r
- if func is os.remove:\r
- self.assertEqual(arg, self.childpath)\r
- else:\r
- self.assertIs(func, os.listdir,\r
- "func must be either os.remove or os.listdir")\r
- self.assertEqual(arg, TESTFN)\r
- self.assertTrue(issubclass(exc[0], OSError))\r
- self.errorState = 1\r
- else:\r
- self.assertEqual(func, os.rmdir)\r
- self.assertEqual(arg, TESTFN)\r
- self.assertTrue(issubclass(exc[0], OSError))\r
- self.errorState = 2\r
-\r
- def test_rmtree_dont_delete_file(self):\r
- # When called on a file instead of a directory, don't delete it.\r
- handle, path = tempfile.mkstemp()\r
- os.fdopen(handle).close()\r
- self.assertRaises(OSError, shutil.rmtree, path)\r
- os.remove(path)\r
-\r
- def test_copytree_simple(self):\r
- def write_data(path, data):\r
- f = open(path, "w")\r
- f.write(data)\r
- f.close()\r
-\r
- def read_data(path):\r
- f = open(path)\r
- data = f.read()\r
- f.close()\r
- return data\r
-\r
- src_dir = tempfile.mkdtemp()\r
- dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')\r
-\r
- write_data(os.path.join(src_dir, 'test.txt'), '123')\r
-\r
- os.mkdir(os.path.join(src_dir, 'test_dir'))\r
- write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')\r
-\r
- try:\r
- shutil.copytree(src_dir, dst_dir)\r
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))\r
- self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))\r
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',\r
- 'test.txt')))\r
- actual = read_data(os.path.join(dst_dir, 'test.txt'))\r
- self.assertEqual(actual, '123')\r
- actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))\r
- self.assertEqual(actual, '456')\r
- finally:\r
- for path in (\r
- os.path.join(src_dir, 'test.txt'),\r
- os.path.join(dst_dir, 'test.txt'),\r
- os.path.join(src_dir, 'test_dir', 'test.txt'),\r
- os.path.join(dst_dir, 'test_dir', 'test.txt'),\r
- ):\r
- if os.path.exists(path):\r
- os.remove(path)\r
- for path in (src_dir,\r
- os.path.dirname(dst_dir)\r
- ):\r
- if os.path.exists(path):\r
- shutil.rmtree(path)\r
-\r
- def test_copytree_with_exclude(self):\r
-\r
- def write_data(path, data):\r
- f = open(path, "w")\r
- f.write(data)\r
- f.close()\r
-\r
- def read_data(path):\r
- f = open(path)\r
- data = f.read()\r
- f.close()\r
- return data\r
-\r
- # creating data\r
- join = os.path.join\r
- exists = os.path.exists\r
- src_dir = tempfile.mkdtemp()\r
- try:\r
- dst_dir = join(tempfile.mkdtemp(), 'destination')\r
- write_data(join(src_dir, 'test.txt'), '123')\r
- write_data(join(src_dir, 'test.tmp'), '123')\r
- os.mkdir(join(src_dir, 'test_dir'))\r
- write_data(join(src_dir, 'test_dir', 'test.txt'), '456')\r
- os.mkdir(join(src_dir, 'test_dir2'))\r
- write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')\r
- os.mkdir(join(src_dir, 'test_dir2', 'subdir'))\r
- os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))\r
- write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')\r
- write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')\r
-\r
-\r
- # testing glob-like patterns\r
- try:\r
- patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')\r
- shutil.copytree(src_dir, dst_dir, ignore=patterns)\r
- # checking the result: some elements should not be copied\r
- self.assertTrue(exists(join(dst_dir, 'test.txt')))\r
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))\r
- self.assertTrue(not exists(join(dst_dir, 'test_dir2')))\r
- finally:\r
- if os.path.exists(dst_dir):\r
- shutil.rmtree(dst_dir)\r
- try:\r
- patterns = shutil.ignore_patterns('*.tmp', 'subdir*')\r
- shutil.copytree(src_dir, dst_dir, ignore=patterns)\r
- # checking the result: some elements should not be copied\r
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))\r
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))\r
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))\r
- finally:\r
- if os.path.exists(dst_dir):\r
- shutil.rmtree(dst_dir)\r
-\r
- # testing callable-style\r
- try:\r
- def _filter(src, names):\r
- res = []\r
- for name in names:\r
- path = os.path.join(src, name)\r
-\r
- if (os.path.isdir(path) and\r
- path.split()[-1] == 'subdir'):\r
- res.append(name)\r
- elif os.path.splitext(path)[-1] in ('.py'):\r
- res.append(name)\r
- return res\r
-\r
- shutil.copytree(src_dir, dst_dir, ignore=_filter)\r
-\r
- # checking the result: some elements should not be copied\r
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',\r
- 'test.py')))\r
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))\r
-\r
- finally:\r
- if os.path.exists(dst_dir):\r
- shutil.rmtree(dst_dir)\r
- finally:\r
- shutil.rmtree(src_dir)\r
- shutil.rmtree(os.path.dirname(dst_dir))\r
-\r
- if hasattr(os, "symlink"):\r
- def test_dont_copy_file_onto_link_to_itself(self):\r
- # bug 851123.\r
- os.mkdir(TESTFN)\r
- src = os.path.join(TESTFN, 'cheese')\r
- dst = os.path.join(TESTFN, 'shop')\r
- try:\r
- f = open(src, 'w')\r
- f.write('cheddar')\r
- f.close()\r
-\r
- os.link(src, dst)\r
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)\r
- with open(src, 'r') as f:\r
- self.assertEqual(f.read(), 'cheddar')\r
- os.remove(dst)\r
-\r
- # Using `src` here would mean we end up with a symlink pointing\r
- # to TESTFN/TESTFN/cheese, while it should point at\r
- # TESTFN/cheese.\r
- os.symlink('cheese', dst)\r
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)\r
- with open(src, 'r') as f:\r
- self.assertEqual(f.read(), 'cheddar')\r
- os.remove(dst)\r
- finally:\r
- try:\r
- shutil.rmtree(TESTFN)\r
- except OSError:\r
- pass\r
-\r
- def test_rmtree_on_symlink(self):\r
- # bug 1669.\r
- os.mkdir(TESTFN)\r
- try:\r
- src = os.path.join(TESTFN, 'cheese')\r
- dst = os.path.join(TESTFN, 'shop')\r
- os.mkdir(src)\r
- os.symlink(src, dst)\r
- self.assertRaises(OSError, shutil.rmtree, dst)\r
- finally:\r
- shutil.rmtree(TESTFN, ignore_errors=True)\r
-\r
- if hasattr(os, "mkfifo"):\r
- # Issue #3002: copyfile and copytree block indefinitely on named pipes\r
- def test_copyfile_named_pipe(self):\r
- os.mkfifo(TESTFN)\r
- try:\r
- self.assertRaises(shutil.SpecialFileError,\r
- shutil.copyfile, TESTFN, TESTFN2)\r
- self.assertRaises(shutil.SpecialFileError,\r
- shutil.copyfile, __file__, TESTFN)\r
- finally:\r
- os.remove(TESTFN)\r
-\r
- def test_copytree_named_pipe(self):\r
- os.mkdir(TESTFN)\r
- try:\r
- subdir = os.path.join(TESTFN, "subdir")\r
- os.mkdir(subdir)\r
- pipe = os.path.join(subdir, "mypipe")\r
- os.mkfifo(pipe)\r
- try:\r
- shutil.copytree(TESTFN, TESTFN2)\r
- except shutil.Error as e:\r
- errors = e.args[0]\r
- self.assertEqual(len(errors), 1)\r
- src, dst, error_msg = errors[0]\r
- self.assertEqual("`%s` is a named pipe" % pipe, error_msg)\r
- else:\r
- self.fail("shutil.Error should have been raised")\r
- finally:\r
- shutil.rmtree(TESTFN, ignore_errors=True)\r
- shutil.rmtree(TESTFN2, ignore_errors=True)\r
-\r
- @unittest.skipUnless(zlib, "requires zlib")\r
- def test_make_tarball(self):\r
- # creating something to tar\r
- tmpdir = self.mkdtemp()\r
- self.write_file([tmpdir, 'file1'], 'xxx')\r
- self.write_file([tmpdir, 'file2'], 'xxx')\r
- os.mkdir(os.path.join(tmpdir, 'sub'))\r
- self.write_file([tmpdir, 'sub', 'file3'], 'xxx')\r
-\r
- tmpdir2 = self.mkdtemp()\r
- unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],\r
- "source and target should be on same drive")\r
-\r
- base_name = os.path.join(tmpdir2, 'archive')\r
-\r
- # working with relative paths to avoid tar warnings\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- _make_tarball(splitdrive(base_name)[1], '.')\r
- finally:\r
- os.chdir(old_dir)\r
-\r
- # check if the compressed tarball was created\r
- tarball = base_name + '.tar.gz'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
- # trying an uncompressed one\r
- base_name = os.path.join(tmpdir2, 'archive')\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- _make_tarball(splitdrive(base_name)[1], '.', compress=None)\r
- finally:\r
- os.chdir(old_dir)\r
- tarball = base_name + '.tar'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
- def _tarinfo(self, path):\r
- tar = tarfile.open(path)\r
- try:\r
- names = tar.getnames()\r
- names.sort()\r
- return tuple(names)\r
- finally:\r
- tar.close()\r
-\r
- def _create_files(self):\r
- # creating something to tar\r
- tmpdir = self.mkdtemp()\r
- dist = os.path.join(tmpdir, 'dist')\r
- os.mkdir(dist)\r
- self.write_file([dist, 'file1'], 'xxx')\r
- self.write_file([dist, 'file2'], 'xxx')\r
- os.mkdir(os.path.join(dist, 'sub'))\r
- self.write_file([dist, 'sub', 'file3'], 'xxx')\r
- os.mkdir(os.path.join(dist, 'sub2'))\r
- tmpdir2 = self.mkdtemp()\r
- base_name = os.path.join(tmpdir2, 'archive')\r
- return tmpdir, tmpdir2, base_name\r
-\r
- @unittest.skipUnless(zlib, "Requires zlib")\r
- @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),\r
- 'Need the tar command to run')\r
- def test_tarfile_vs_tar(self):\r
- tmpdir, tmpdir2, base_name = self._create_files()\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- _make_tarball(base_name, 'dist')\r
- finally:\r
- os.chdir(old_dir)\r
-\r
- # check if the compressed tarball was created\r
- tarball = base_name + '.tar.gz'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
- # now create another tarball using `tar`\r
- tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')\r
- tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']\r
- gzip_cmd = ['gzip', '-f9', 'archive2.tar']\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- with captured_stdout() as s:\r
- spawn(tar_cmd)\r
- spawn(gzip_cmd)\r
- finally:\r
- os.chdir(old_dir)\r
-\r
- self.assertTrue(os.path.exists(tarball2))\r
- # let's compare both tarballs\r
- self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))\r
-\r
- # trying an uncompressed one\r
- base_name = os.path.join(tmpdir2, 'archive')\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- _make_tarball(base_name, 'dist', compress=None)\r
- finally:\r
- os.chdir(old_dir)\r
- tarball = base_name + '.tar'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
- # now for a dry_run\r
- base_name = os.path.join(tmpdir2, 'archive')\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- try:\r
- _make_tarball(base_name, 'dist', compress=None, dry_run=True)\r
- finally:\r
- os.chdir(old_dir)\r
- tarball = base_name + '.tar'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
- @unittest.skipUnless(zlib, "Requires zlib")\r
- @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')\r
- def test_make_zipfile(self):\r
- # creating something to tar\r
- tmpdir = self.mkdtemp()\r
- self.write_file([tmpdir, 'file1'], 'xxx')\r
- self.write_file([tmpdir, 'file2'], 'xxx')\r
-\r
- tmpdir2 = self.mkdtemp()\r
- base_name = os.path.join(tmpdir2, 'archive')\r
- _make_zipfile(base_name, tmpdir)\r
-\r
- # check if the compressed tarball was created\r
- tarball = base_name + '.zip'\r
- self.assertTrue(os.path.exists(tarball))\r
-\r
-\r
- def test_make_archive(self):\r
- tmpdir = self.mkdtemp()\r
- base_name = os.path.join(tmpdir, 'archive')\r
- self.assertRaises(ValueError, make_archive, base_name, 'xxx')\r
-\r
- @unittest.skipUnless(zlib, "Requires zlib")\r
- def test_make_archive_owner_group(self):\r
- # testing make_archive with owner and group, with various combinations\r
- # this works even if there's not gid/uid support\r
- if UID_GID_SUPPORT:\r
- group = grp.getgrgid(0)[0]\r
- owner = pwd.getpwuid(0)[0]\r
- else:\r
- group = owner = 'root'\r
-\r
- base_dir, root_dir, base_name = self._create_files()\r
- base_name = os.path.join(self.mkdtemp() , 'archive')\r
- res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,\r
- group=group)\r
- self.assertTrue(os.path.exists(res))\r
-\r
- res = make_archive(base_name, 'zip', root_dir, base_dir)\r
- self.assertTrue(os.path.exists(res))\r
-\r
- res = make_archive(base_name, 'tar', root_dir, base_dir,\r
- owner=owner, group=group)\r
- self.assertTrue(os.path.exists(res))\r
-\r
- res = make_archive(base_name, 'tar', root_dir, base_dir,\r
- owner='kjhkjhkjg', group='oihohoh')\r
- self.assertTrue(os.path.exists(res))\r
-\r
- @unittest.skipUnless(zlib, "Requires zlib")\r
- @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")\r
- def test_tarfile_root_owner(self):\r
- tmpdir, tmpdir2, base_name = self._create_files()\r
- old_dir = os.getcwd()\r
- os.chdir(tmpdir)\r
- group = grp.getgrgid(0)[0]\r
- owner = pwd.getpwuid(0)[0]\r
- try:\r
- archive_name = _make_tarball(base_name, 'dist', compress=None,\r
- owner=owner, group=group)\r
- finally:\r
- os.chdir(old_dir)\r
-\r
- # check if the compressed tarball was created\r
- self.assertTrue(os.path.exists(archive_name))\r
-\r
- # now checks the rights\r
- archive = tarfile.open(archive_name)\r
- try:\r
- for member in archive.getmembers():\r
- self.assertEqual(member.uid, 0)\r
- self.assertEqual(member.gid, 0)\r
- finally:\r
- archive.close()\r
-\r
- def test_make_archive_cwd(self):\r
- current_dir = os.getcwd()\r
- def _breaks(*args, **kw):\r
- raise RuntimeError()\r
-\r
- register_archive_format('xxx', _breaks, [], 'xxx file')\r
- try:\r
- try:\r
- make_archive('xxx', 'xxx', root_dir=self.mkdtemp())\r
- except Exception:\r
- pass\r
- self.assertEqual(os.getcwd(), current_dir)\r
- finally:\r
- unregister_archive_format('xxx')\r
-\r
- def test_register_archive_format(self):\r
-\r
- self.assertRaises(TypeError, register_archive_format, 'xxx', 1)\r
- self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,\r
- 1)\r
- self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,\r
- [(1, 2), (1, 2, 3)])\r
-\r
- register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')\r
- formats = [name for name, params in get_archive_formats()]\r
- self.assertIn('xxx', formats)\r
-\r
- unregister_archive_format('xxx')\r
- formats = [name for name, params in get_archive_formats()]\r
- self.assertNotIn('xxx', formats)\r
-\r
-\r
-class TestMove(unittest.TestCase):\r
-\r
- def setUp(self):\r
- filename = "foo"\r
- self.src_dir = tempfile.mkdtemp()\r
- self.dst_dir = tempfile.mkdtemp()\r
- self.src_file = os.path.join(self.src_dir, filename)\r
- self.dst_file = os.path.join(self.dst_dir, filename)\r
- # Try to create a dir in the current directory, hoping that it is\r
- # not located on the same filesystem as the system tmp dir.\r
- try:\r
- self.dir_other_fs = tempfile.mkdtemp(\r
- dir=os.path.dirname(__file__))\r
- self.file_other_fs = os.path.join(self.dir_other_fs,\r
- filename)\r
- except OSError:\r
- self.dir_other_fs = None\r
- with open(self.src_file, "wb") as f:\r
- f.write("spam")\r
-\r
- def tearDown(self):\r
- for d in (self.src_dir, self.dst_dir, self.dir_other_fs):\r
- try:\r
- if d:\r
- shutil.rmtree(d)\r
- except:\r
- pass\r
-\r
- def _check_move_file(self, src, dst, real_dst):\r
- with open(src, "rb") as f:\r
- contents = f.read()\r
- shutil.move(src, dst)\r
- with open(real_dst, "rb") as f:\r
- self.assertEqual(contents, f.read())\r
- self.assertFalse(os.path.exists(src))\r
-\r
- def _check_move_dir(self, src, dst, real_dst):\r
- contents = sorted(os.listdir(src))\r
- shutil.move(src, dst)\r
- self.assertEqual(contents, sorted(os.listdir(real_dst)))\r
- self.assertFalse(os.path.exists(src))\r
-\r
- def test_move_file(self):\r
- # Move a file to another location on the same filesystem.\r
- self._check_move_file(self.src_file, self.dst_file, self.dst_file)\r
-\r
- def test_move_file_to_dir(self):\r
- # Move a file inside an existing dir on the same filesystem.\r
- self._check_move_file(self.src_file, self.dst_dir, self.dst_file)\r
-\r
- def test_move_file_other_fs(self):\r
- # Move a file to an existing dir on another filesystem.\r
- if not self.dir_other_fs:\r
- # skip\r
- return\r
- self._check_move_file(self.src_file, self.file_other_fs,\r
- self.file_other_fs)\r
-\r
- def test_move_file_to_dir_other_fs(self):\r
- # Move a file to another location on another filesystem.\r
- if not self.dir_other_fs:\r
- # skip\r
- return\r
- self._check_move_file(self.src_file, self.dir_other_fs,\r
- self.file_other_fs)\r
-\r
- def test_move_dir(self):\r
- # Move a dir to another location on the same filesystem.\r
- dst_dir = tempfile.mktemp()\r
- try:\r
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)\r
- finally:\r
- try:\r
- shutil.rmtree(dst_dir)\r
- except:\r
- pass\r
-\r
- def test_move_dir_other_fs(self):\r
- # Move a dir to another location on another filesystem.\r
- if not self.dir_other_fs:\r
- # skip\r
- return\r
- dst_dir = tempfile.mktemp(dir=self.dir_other_fs)\r
- try:\r
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)\r
- finally:\r
- try:\r
- shutil.rmtree(dst_dir)\r
- except:\r
- pass\r
-\r
- def test_move_dir_to_dir(self):\r
- # Move a dir inside an existing dir on the same filesystem.\r
- self._check_move_dir(self.src_dir, self.dst_dir,\r
- os.path.join(self.dst_dir, os.path.basename(self.src_dir)))\r
-\r
- def test_move_dir_to_dir_other_fs(self):\r
- # Move a dir inside an existing dir on another filesystem.\r
- if not self.dir_other_fs:\r
- # skip\r
- return\r
- self._check_move_dir(self.src_dir, self.dir_other_fs,\r
- os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))\r
-\r
- def test_existing_file_inside_dest_dir(self):\r
- # A file with the same name inside the destination dir already exists.\r
- with open(self.dst_file, "wb"):\r
- pass\r
- self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)\r
-\r
- def test_dont_move_dir_in_itself(self):\r
- # Moving a dir inside itself raises an Error.\r
- dst = os.path.join(self.src_dir, "bar")\r
- self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)\r
-\r
- def test_destinsrc_false_negative(self):\r
- os.mkdir(TESTFN)\r
- try:\r
- for src, dst in [('srcdir', 'srcdir/dest')]:\r
- src = os.path.join(TESTFN, src)\r
- dst = os.path.join(TESTFN, dst)\r
- self.assertTrue(shutil._destinsrc(src, dst),\r
- msg='_destinsrc() wrongly concluded that '\r
- 'dst (%s) is not in src (%s)' % (dst, src))\r
- finally:\r
- shutil.rmtree(TESTFN, ignore_errors=True)\r
-\r
- def test_destinsrc_false_positive(self):\r
- os.mkdir(TESTFN)\r
- try:\r
- for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:\r
- src = os.path.join(TESTFN, src)\r
- dst = os.path.join(TESTFN, dst)\r
- self.assertFalse(shutil._destinsrc(src, dst),\r
- msg='_destinsrc() wrongly concluded that '\r
- 'dst (%s) is in src (%s)' % (dst, src))\r
- finally:\r
- shutil.rmtree(TESTFN, ignore_errors=True)\r
-\r
-\r
-class TestCopyFile(unittest.TestCase):\r
-\r
- _delete = False\r
-\r
- class Faux(object):\r
- _entered = False\r
- _exited_with = None\r
- _raised = False\r
- def __init__(self, raise_in_exit=False, suppress_at_exit=True):\r
- self._raise_in_exit = raise_in_exit\r
- self._suppress_at_exit = suppress_at_exit\r
- def read(self, *args):\r
- return ''\r
- def __enter__(self):\r
- self._entered = True\r
- def __exit__(self, exc_type, exc_val, exc_tb):\r
- self._exited_with = exc_type, exc_val, exc_tb\r
- if self._raise_in_exit:\r
- self._raised = True\r
- raise IOError("Cannot close")\r
- return self._suppress_at_exit\r
-\r
- def tearDown(self):\r
- if self._delete:\r
- del shutil.open\r
-\r
- def _set_shutil_open(self, func):\r
- shutil.open = func\r
- self._delete = True\r
-\r
- def test_w_source_open_fails(self):\r
- def _open(filename, mode='r'):\r
- if filename == 'srcfile':\r
- raise IOError('Cannot open "srcfile"')\r
- assert 0 # shouldn't reach here.\r
-\r
- self._set_shutil_open(_open)\r
-\r
- self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')\r
-\r
- def test_w_dest_open_fails(self):\r
-\r
- srcfile = self.Faux()\r
-\r
- def _open(filename, mode='r'):\r
- if filename == 'srcfile':\r
- return srcfile\r
- if filename == 'destfile':\r
- raise IOError('Cannot open "destfile"')\r
- assert 0 # shouldn't reach here.\r
-\r
- self._set_shutil_open(_open)\r
-\r
- shutil.copyfile('srcfile', 'destfile')\r
- self.assertTrue(srcfile._entered)\r
- self.assertTrue(srcfile._exited_with[0] is IOError)\r
- self.assertEqual(srcfile._exited_with[1].args,\r
- ('Cannot open "destfile"',))\r
-\r
- def test_w_dest_close_fails(self):\r
-\r
- srcfile = self.Faux()\r
- destfile = self.Faux(True)\r
-\r
- def _open(filename, mode='r'):\r
- if filename == 'srcfile':\r
- return srcfile\r
- if filename == 'destfile':\r
- return destfile\r
- assert 0 # shouldn't reach here.\r
-\r
- self._set_shutil_open(_open)\r
-\r
- shutil.copyfile('srcfile', 'destfile')\r
- self.assertTrue(srcfile._entered)\r
- self.assertTrue(destfile._entered)\r
- self.assertTrue(destfile._raised)\r
- self.assertTrue(srcfile._exited_with[0] is IOError)\r
- self.assertEqual(srcfile._exited_with[1].args,\r
- ('Cannot close',))\r
-\r
- def test_w_source_close_fails(self):\r
-\r
- srcfile = self.Faux(True)\r
- destfile = self.Faux()\r
-\r
- def _open(filename, mode='r'):\r
- if filename == 'srcfile':\r
- return srcfile\r
- if filename == 'destfile':\r
- return destfile\r
- assert 0 # shouldn't reach here.\r
-\r
- self._set_shutil_open(_open)\r
-\r
- self.assertRaises(IOError,\r
- shutil.copyfile, 'srcfile', 'destfile')\r
- self.assertTrue(srcfile._entered)\r
- self.assertTrue(destfile._entered)\r
- self.assertFalse(destfile._raised)\r
- self.assertTrue(srcfile._exited_with[0] is None)\r
- self.assertTrue(srcfile._raised)\r
-\r
- def test_move_dir_caseinsensitive(self):\r
- # Renames a folder to the same name\r
- # but a different case.\r
-\r
- self.src_dir = tempfile.mkdtemp()\r
- dst_dir = os.path.join(\r
- os.path.dirname(self.src_dir),\r
- os.path.basename(self.src_dir).upper())\r
- self.assertNotEqual(self.src_dir, dst_dir)\r
-\r
- try:\r
- shutil.move(self.src_dir, dst_dir)\r
- self.assertTrue(os.path.isdir(dst_dir))\r
- finally:\r
- if os.path.exists(dst_dir):\r
- os.rmdir(dst_dir)\r
-\r
-\r
-\r
-def test_main():\r
- test_support.run_unittest(TestShutil, TestMove, TestCopyFile)\r
-\r
-if __name__ == '__main__':\r
- test_main()\r