+++ /dev/null
-"""Utility functions for copying and archiving files and directory trees.\r
-\r
-XXX The functions here don't copy the resource fork or other metadata on Mac.\r
-\r
-"""\r
-\r
-import os\r
-import sys\r
-import stat\r
-from os.path import abspath\r
-import fnmatch\r
-import collections\r
-import errno\r
-\r
-try:\r
- from pwd import getpwnam\r
-except ImportError:\r
- getpwnam = None\r
-\r
-try:\r
- from grp import getgrnam\r
-except ImportError:\r
- getgrnam = None\r
-\r
-__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",\r
- "copytree", "move", "rmtree", "Error", "SpecialFileError",\r
- "ExecError", "make_archive", "get_archive_formats",\r
- "register_archive_format", "unregister_archive_format"]\r
-\r
-class Error(EnvironmentError):\r
- pass\r
-\r
-class SpecialFileError(EnvironmentError):\r
- """Raised when trying to do a kind of operation (e.g. copying) which is\r
- not supported on a special file (e.g. a named pipe)"""\r
-\r
-class ExecError(EnvironmentError):\r
- """Raised when a command could not be executed"""\r
-\r
-try:\r
- WindowsError\r
-except NameError:\r
- WindowsError = None\r
-\r
-def copyfileobj(fsrc, fdst, length=16*1024):\r
- """copy data from file-like object fsrc to file-like object fdst"""\r
- while 1:\r
- buf = fsrc.read(length)\r
- if not buf:\r
- break\r
- fdst.write(buf)\r
-\r
-def _samefile(src, dst):\r
- # Macintosh, Unix.\r
- if hasattr(os.path, 'samefile'):\r
- try:\r
- return os.path.samefile(src, dst)\r
- except OSError:\r
- return False\r
-\r
- # All other platforms: check for same pathname.\r
- return (os.path.normcase(os.path.abspath(src)) ==\r
- os.path.normcase(os.path.abspath(dst)))\r
-\r
-def copyfile(src, dst):\r
- """Copy data from src to dst"""\r
- if _samefile(src, dst):\r
- raise Error("`%s` and `%s` are the same file" % (src, dst))\r
-\r
- for fn in [src, dst]:\r
- try:\r
- st = os.stat(fn)\r
- except OSError:\r
- # File most likely does not exist\r
- pass\r
- else:\r
- # XXX What about other special files? (sockets, devices...)\r
- if stat.S_ISFIFO(st.st_mode):\r
- raise SpecialFileError("`%s` is a named pipe" % fn)\r
-\r
- with open(src, 'rb') as fsrc:\r
- with open(dst, 'wb') as fdst:\r
- copyfileobj(fsrc, fdst)\r
-\r
-def copymode(src, dst):\r
- """Copy mode bits from src to dst"""\r
- if hasattr(os, 'chmod'):\r
- st = os.stat(src)\r
- mode = stat.S_IMODE(st.st_mode)\r
- os.chmod(dst, mode)\r
-\r
-def copystat(src, dst):\r
- """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""\r
- st = os.stat(src)\r
- mode = stat.S_IMODE(st.st_mode)\r
- if hasattr(os, 'utime'):\r
- os.utime(dst, (st.st_atime, st.st_mtime))\r
- if hasattr(os, 'chmod'):\r
- os.chmod(dst, mode)\r
- if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):\r
- try:\r
- os.chflags(dst, st.st_flags)\r
- except OSError, why:\r
- if (not hasattr(errno, 'EOPNOTSUPP') or\r
- why.errno != errno.EOPNOTSUPP):\r
- raise\r
-\r
-def copy(src, dst):\r
- """Copy data and mode bits ("cp src dst").\r
-\r
- The destination may be a directory.\r
-\r
- """\r
- if os.path.isdir(dst):\r
- dst = os.path.join(dst, os.path.basename(src))\r
- copyfile(src, dst)\r
- copymode(src, dst)\r
-\r
-def copy2(src, dst):\r
- """Copy data and all stat info ("cp -p src dst").\r
-\r
- The destination may be a directory.\r
-\r
- """\r
- if os.path.isdir(dst):\r
- dst = os.path.join(dst, os.path.basename(src))\r
- copyfile(src, dst)\r
- copystat(src, dst)\r
-\r
-def ignore_patterns(*patterns):\r
- """Function that can be used as copytree() ignore parameter.\r
-\r
- Patterns is a sequence of glob-style patterns\r
- that are used to exclude files"""\r
- def _ignore_patterns(path, names):\r
- ignored_names = []\r
- for pattern in patterns:\r
- ignored_names.extend(fnmatch.filter(names, pattern))\r
- return set(ignored_names)\r
- return _ignore_patterns\r
-\r
-def copytree(src, dst, symlinks=False, ignore=None):\r
- """Recursively copy a directory tree using copy2().\r
-\r
- The destination directory must not already exist.\r
- If exception(s) occur, an Error is raised with a list of reasons.\r
-\r
- If the optional symlinks flag is true, symbolic links in the\r
- source tree result in symbolic links in the destination tree; if\r
- it is false, the contents of the files pointed to by symbolic\r
- links are copied.\r
-\r
- The optional ignore argument is a callable. If given, it\r
- is called with the `src` parameter, which is the directory\r
- being visited by copytree(), and `names` which is the list of\r
- `src` contents, as returned by os.listdir():\r
-\r
- callable(src, names) -> ignored_names\r
-\r
- Since copytree() is called recursively, the callable will be\r
- called once for each directory that is copied. It returns a\r
- list of names relative to the `src` directory that should\r
- not be copied.\r
-\r
- XXX Consider this example code rather than the ultimate tool.\r
-\r
- """\r
- names = os.listdir(src)\r
- if ignore is not None:\r
- ignored_names = ignore(src, names)\r
- else:\r
- ignored_names = set()\r
-\r
- os.makedirs(dst)\r
- errors = []\r
- for name in names:\r
- if name in ignored_names:\r
- continue\r
- srcname = os.path.join(src, name)\r
- dstname = os.path.join(dst, name)\r
- try:\r
- if symlinks and os.path.islink(srcname):\r
- linkto = os.readlink(srcname)\r
- os.symlink(linkto, dstname)\r
- elif os.path.isdir(srcname):\r
- copytree(srcname, dstname, symlinks, ignore)\r
- else:\r
- # Will raise a SpecialFileError for unsupported file types\r
- copy2(srcname, dstname)\r
- # catch the Error from the recursive copytree so that we can\r
- # continue with other files\r
- except Error, err:\r
- errors.extend(err.args[0])\r
- except EnvironmentError, why:\r
- errors.append((srcname, dstname, str(why)))\r
- try:\r
- copystat(src, dst)\r
- except OSError, why:\r
- if WindowsError is not None and isinstance(why, WindowsError):\r
- # Copying file access times may fail on Windows\r
- pass\r
- else:\r
- errors.extend((src, dst, str(why)))\r
- if errors:\r
- raise Error, errors\r
-\r
-def rmtree(path, ignore_errors=False, onerror=None):\r
- """Recursively delete a directory tree.\r
-\r
- If ignore_errors is set, errors are ignored; otherwise, if onerror\r
- is set, it is called to handle the error with arguments (func,\r
- path, exc_info) where func is os.listdir, os.remove, or os.rmdir;\r
- path is the argument to that function that caused it to fail; and\r
- exc_info is a tuple returned by sys.exc_info(). If ignore_errors\r
- is false and onerror is None, an exception is raised.\r
-\r
- """\r
- if ignore_errors:\r
- def onerror(*args):\r
- pass\r
- elif onerror is None:\r
- def onerror(*args):\r
- raise\r
- try:\r
- if os.path.islink(path):\r
- # symlinks to directories are forbidden, see bug #1669\r
- raise OSError("Cannot call rmtree on a symbolic link")\r
- except OSError:\r
- onerror(os.path.islink, path, sys.exc_info())\r
- # can't continue even if onerror hook returns\r
- return\r
- names = []\r
- try:\r
- names = os.listdir(path)\r
- except os.error, err:\r
- onerror(os.listdir, path, sys.exc_info())\r
- for name in names:\r
- fullname = os.path.join(path, name)\r
- try:\r
- mode = os.lstat(fullname).st_mode\r
- except os.error:\r
- mode = 0\r
- if stat.S_ISDIR(mode):\r
- rmtree(fullname, ignore_errors, onerror)\r
- else:\r
- try:\r
- os.remove(fullname)\r
- except os.error, err:\r
- onerror(os.remove, fullname, sys.exc_info())\r
- try:\r
- os.rmdir(path)\r
- except os.error:\r
- onerror(os.rmdir, path, sys.exc_info())\r
-\r
-\r
-def _basename(path):\r
- # A basename() variant which first strips the trailing slash, if present.\r
- # Thus we always get the last component of the path, even for directories.\r
- return os.path.basename(path.rstrip(os.path.sep))\r
-\r
-def move(src, dst):\r
- """Recursively move a file or directory to another location. This is\r
- similar to the Unix "mv" command.\r
-\r
- If the destination is a directory or a symlink to a directory, the source\r
- is moved inside the directory. The destination path must not already\r
- exist.\r
-\r
- If the destination already exists but is not a directory, it may be\r
- overwritten depending on os.rename() semantics.\r
-\r
- If the destination is on our current filesystem, then rename() is used.\r
- Otherwise, src is copied to the destination and then removed.\r
- A lot more could be done here... A look at a mv.c shows a lot of\r
- the issues this implementation glosses over.\r
-\r
- """\r
- real_dst = dst\r
- if os.path.isdir(dst):\r
- if _samefile(src, dst):\r
- # We might be on a case insensitive filesystem,\r
- # perform the rename anyway.\r
- os.rename(src, dst)\r
- return\r
-\r
- real_dst = os.path.join(dst, _basename(src))\r
- if os.path.exists(real_dst):\r
- raise Error, "Destination path '%s' already exists" % real_dst\r
- try:\r
- os.rename(src, real_dst)\r
- except OSError:\r
- if os.path.isdir(src):\r
- if _destinsrc(src, dst):\r
- raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)\r
- copytree(src, real_dst, symlinks=True)\r
- rmtree(src)\r
- else:\r
- copy2(src, real_dst)\r
- os.unlink(src)\r
-\r
-def _destinsrc(src, dst):\r
- src = abspath(src)\r
- dst = abspath(dst)\r
- if not src.endswith(os.path.sep):\r
- src += os.path.sep\r
- if not dst.endswith(os.path.sep):\r
- dst += os.path.sep\r
- return dst.startswith(src)\r
-\r
-def _get_gid(name):\r
- """Returns a gid, given a group name."""\r
- if getgrnam is None or name is None:\r
- return None\r
- try:\r
- result = getgrnam(name)\r
- except KeyError:\r
- result = None\r
- if result is not None:\r
- return result[2]\r
- return None\r
-\r
-def _get_uid(name):\r
- """Returns an uid, given a user name."""\r
- if getpwnam is None or name is None:\r
- return None\r
- try:\r
- result = getpwnam(name)\r
- except KeyError:\r
- result = None\r
- if result is not None:\r
- return result[2]\r
- return None\r
-\r
-def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,\r
- owner=None, group=None, logger=None):\r
- """Create a (possibly compressed) tar file from all the files under\r
- 'base_dir'.\r
-\r
- 'compress' must be "gzip" (the default), "bzip2", or None.\r
-\r
- 'owner' and 'group' can be used to define an owner and a group for the\r
- archive that is being built. If not provided, the current owner and group\r
- will be used.\r
-\r
- The output tar file will be named 'base_name' + ".tar", possibly plus\r
- the appropriate compression extension (".gz", or ".bz2").\r
-\r
- Returns the output filename.\r
- """\r
- tar_compression = {'gzip': 'gz', 'bzip2': 'bz2', None: ''}\r
- compress_ext = {'gzip': '.gz', 'bzip2': '.bz2'}\r
-\r
- # flags for compression program, each element of list will be an argument\r
- if compress is not None and compress not in compress_ext.keys():\r
- raise ValueError, \\r
- ("bad value for 'compress': must be None, 'gzip' or 'bzip2'")\r
-\r
- archive_name = base_name + '.tar' + compress_ext.get(compress, '')\r
- archive_dir = os.path.dirname(archive_name)\r
-\r
- if not os.path.exists(archive_dir):\r
- logger.info("creating %s" % archive_dir)\r
- if not dry_run:\r
- os.makedirs(archive_dir)\r
-\r
-\r
- # creating the tarball\r
- import tarfile # late import so Python build itself doesn't break\r
-\r
- if logger is not None:\r
- logger.info('Creating tar archive')\r
-\r
- uid = _get_uid(owner)\r
- gid = _get_gid(group)\r
-\r
- def _set_uid_gid(tarinfo):\r
- if gid is not None:\r
- tarinfo.gid = gid\r
- tarinfo.gname = group\r
- if uid is not None:\r
- tarinfo.uid = uid\r
- tarinfo.uname = owner\r
- return tarinfo\r
-\r
- if not dry_run:\r
- tar = tarfile.open(archive_name, 'w|%s' % tar_compression[compress])\r
- try:\r
- tar.add(base_dir, filter=_set_uid_gid)\r
- finally:\r
- tar.close()\r
-\r
- return archive_name\r
-\r
-def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):\r
- # XXX see if we want to keep an external call here\r
- if verbose:\r
- zipoptions = "-r"\r
- else:\r
- zipoptions = "-rq"\r
- from distutils.errors import DistutilsExecError\r
- from distutils.spawn import spawn\r
- try:\r
- spawn(["zip", zipoptions, zip_filename, base_dir], dry_run=dry_run)\r
- except DistutilsExecError:\r
- # XXX really should distinguish between "couldn't find\r
- # external 'zip' command" and "zip failed".\r
- raise ExecError, \\r
- ("unable to create zip file '%s': "\r
- "could neither import the 'zipfile' module nor "\r
- "find a standalone zip utility") % zip_filename\r
-\r
-def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):\r
- """Create a zip file from all the files under 'base_dir'.\r
-\r
- The output zip file will be named 'base_name' + ".zip". Uses either the\r
- "zipfile" Python module (if available) or the InfoZIP "zip" utility\r
- (if installed and found on the default search path). If neither tool is\r
- available, raises ExecError. Returns the name of the output zip\r
- file.\r
- """\r
- zip_filename = base_name + ".zip"\r
- archive_dir = os.path.dirname(base_name)\r
-\r
- if not os.path.exists(archive_dir):\r
- if logger is not None:\r
- logger.info("creating %s", archive_dir)\r
- if not dry_run:\r
- os.makedirs(archive_dir)\r
-\r
- # If zipfile module is not available, try spawning an external 'zip'\r
- # command.\r
- try:\r
- import zipfile\r
- except ImportError:\r
- zipfile = None\r
-\r
- if zipfile is None:\r
- _call_external_zip(base_dir, zip_filename, verbose, dry_run)\r
- else:\r
- if logger is not None:\r
- logger.info("creating '%s' and adding '%s' to it",\r
- zip_filename, base_dir)\r
-\r
- if not dry_run:\r
- zip = zipfile.ZipFile(zip_filename, "w",\r
- compression=zipfile.ZIP_DEFLATED)\r
-\r
- for dirpath, dirnames, filenames in os.walk(base_dir):\r
- for name in filenames:\r
- path = os.path.normpath(os.path.join(dirpath, name))\r
- if os.path.isfile(path):\r
- zip.write(path, path)\r
- if logger is not None:\r
- logger.info("adding '%s'", path)\r
- zip.close()\r
-\r
- return zip_filename\r
-\r
-_ARCHIVE_FORMATS = {\r
- 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),\r
- 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),\r
- 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),\r
- 'zip': (_make_zipfile, [],"ZIP file")\r
- }\r
-\r
-def get_archive_formats():\r
- """Returns a list of supported formats for archiving and unarchiving.\r
-\r
- Each element of the returned sequence is a tuple (name, description)\r
- """\r
- formats = [(name, registry[2]) for name, registry in\r
- _ARCHIVE_FORMATS.items()]\r
- formats.sort()\r
- return formats\r
-\r
-def register_archive_format(name, function, extra_args=None, description=''):\r
- """Registers an archive format.\r
-\r
- name is the name of the format. function is the callable that will be\r
- used to create archives. If provided, extra_args is a sequence of\r
- (name, value) tuples that will be passed as arguments to the callable.\r
- description can be provided to describe the format, and will be returned\r
- by the get_archive_formats() function.\r
- """\r
- if extra_args is None:\r
- extra_args = []\r
- if not isinstance(function, collections.Callable):\r
- raise TypeError('The %s object is not callable' % function)\r
- if not isinstance(extra_args, (tuple, list)):\r
- raise TypeError('extra_args needs to be a sequence')\r
- for element in extra_args:\r
- if not isinstance(element, (tuple, list)) or len(element) !=2 :\r
- raise TypeError('extra_args elements are : (arg_name, value)')\r
-\r
- _ARCHIVE_FORMATS[name] = (function, extra_args, description)\r
-\r
-def unregister_archive_format(name):\r
- del _ARCHIVE_FORMATS[name]\r
-\r
-def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,\r
- dry_run=0, owner=None, group=None, logger=None):\r
- """Create an archive file (eg. zip or tar).\r
-\r
- 'base_name' is the name of the file to create, minus any format-specific\r
- extension; 'format' is the archive format: one of "zip", "tar", "bztar"\r
- or "gztar".\r
-\r
- 'root_dir' is a directory that will be the root directory of the\r
- archive; ie. we typically chdir into 'root_dir' before creating the\r
- archive. 'base_dir' is the directory where we start archiving from;\r
- ie. 'base_dir' will be the common prefix of all files and\r
- directories in the archive. 'root_dir' and 'base_dir' both default\r
- to the current directory. Returns the name of the archive file.\r
-\r
- 'owner' and 'group' are used when creating a tar archive. By default,\r
- uses the current owner and group.\r
- """\r
- save_cwd = os.getcwd()\r
- if root_dir is not None:\r
- if logger is not None:\r
- logger.debug("changing into '%s'", root_dir)\r
- base_name = os.path.abspath(base_name)\r
- if not dry_run:\r
- os.chdir(root_dir)\r
-\r
- if base_dir is None:\r
- base_dir = os.curdir\r
-\r
- kwargs = {'dry_run': dry_run, 'logger': logger}\r
-\r
- try:\r
- format_info = _ARCHIVE_FORMATS[format]\r
- except KeyError:\r
- raise ValueError, "unknown archive format '%s'" % format\r
-\r
- func = format_info[0]\r
- for arg, val in format_info[1]:\r
- kwargs[arg] = val\r
-\r
- if format != 'zip':\r
- kwargs['owner'] = owner\r
- kwargs['group'] = group\r
-\r
- try:\r
- filename = func(base_name, base_dir, **kwargs)\r
- finally:\r
- if root_dir is not None:\r
- if logger is not None:\r
- logger.debug("changing back to '%s'", save_cwd)\r
- os.chdir(save_cwd)\r
-\r
- return filename\r