+++ /dev/null
-"""Temporary files.\r
-\r
-This module provides generic, low- and high-level interfaces for\r
-creating temporary files and directories. The interfaces listed\r
-as "safe" just below can be used without fear of race conditions.\r
-Those listed as "unsafe" cannot, and are provided for backward\r
-compatibility only.\r
-\r
-This module also provides some data items to the user:\r
-\r
- TMP_MAX - maximum number of names that will be tried before\r
- giving up.\r
- template - the default prefix for all temporary names.\r
- You may change this to control the default prefix.\r
- tempdir - If this is set to a string before the first use of\r
- any routine from this module, it will be considered as\r
- another candidate location to store temporary files.\r
-"""\r
-\r
-__all__ = [\r
- "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces\r
- "SpooledTemporaryFile",\r
- "mkstemp", "mkdtemp", # low level safe interfaces\r
- "mktemp", # deprecated unsafe interface\r
- "TMP_MAX", "gettempprefix", # constants\r
- "tempdir", "gettempdir"\r
- ]\r
-\r
-\r
-# Imports.\r
-\r
-import os as _os\r
-import errno as _errno\r
-from random import Random as _Random\r
-\r
-try:\r
- from cStringIO import StringIO as _StringIO\r
-except ImportError:\r
- from StringIO import StringIO as _StringIO\r
-\r
-try:\r
- import fcntl as _fcntl\r
-except ImportError:\r
- def _set_cloexec(fd):\r
- pass\r
-else:\r
- def _set_cloexec(fd):\r
- try:\r
- flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)\r
- except IOError:\r
- pass\r
- else:\r
- # flags read successfully, modify\r
- flags |= _fcntl.FD_CLOEXEC\r
- _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)\r
-\r
-\r
-try:\r
- import thread as _thread\r
-except ImportError:\r
- import dummy_thread as _thread\r
-_allocate_lock = _thread.allocate_lock\r
-\r
-_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL\r
-if hasattr(_os, 'O_NOINHERIT'):\r
- _text_openflags |= _os.O_NOINHERIT\r
-if hasattr(_os, 'O_NOFOLLOW'):\r
- _text_openflags |= _os.O_NOFOLLOW\r
-\r
-_bin_openflags = _text_openflags\r
-if hasattr(_os, 'O_BINARY'):\r
- _bin_openflags |= _os.O_BINARY\r
-\r
-if hasattr(_os, 'TMP_MAX'):\r
- TMP_MAX = _os.TMP_MAX\r
-else:\r
- TMP_MAX = 10000\r
-\r
-template = "tmp"\r
-\r
-# Internal routines.\r
-\r
-_once_lock = _allocate_lock()\r
-\r
-if hasattr(_os, "lstat"):\r
- _stat = _os.lstat\r
-elif hasattr(_os, "stat"):\r
- _stat = _os.stat\r
-else:\r
- # Fallback. All we need is something that raises os.error if the\r
- # file doesn't exist.\r
- def _stat(fn):\r
- try:\r
- f = open(fn)\r
- except IOError:\r
- raise _os.error\r
- f.close()\r
-\r
-def _exists(fn):\r
- try:\r
- _stat(fn)\r
- except _os.error:\r
- return False\r
- else:\r
- return True\r
-\r
-class _RandomNameSequence:\r
- """An instance of _RandomNameSequence generates an endless\r
- sequence of unpredictable strings which can safely be incorporated\r
- into file names. Each string is six characters long. Multiple\r
- threads can safely use the same instance at the same time.\r
-\r
- _RandomNameSequence is an iterator."""\r
-\r
- characters = ("abcdefghijklmnopqrstuvwxyz" +\r
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +\r
- "0123456789_")\r
-\r
- def __init__(self):\r
- self.mutex = _allocate_lock()\r
- self.rng = _Random()\r
- self.normcase = _os.path.normcase\r
-\r
- def __iter__(self):\r
- return self\r
-\r
- def next(self):\r
- m = self.mutex\r
- c = self.characters\r
- choose = self.rng.choice\r
-\r
- m.acquire()\r
- try:\r
- letters = [choose(c) for dummy in "123456"]\r
- finally:\r
- m.release()\r
-\r
- return self.normcase(''.join(letters))\r
-\r
-def _candidate_tempdir_list():\r
- """Generate a list of candidate temporary directories which\r
- _get_default_tempdir will try."""\r
-\r
- dirlist = []\r
-\r
- # First, try the environment.\r
- for envname in 'TMPDIR', 'TEMP', 'TMP':\r
- dirname = _os.getenv(envname)\r
- if dirname: dirlist.append(dirname)\r
-\r
- # Failing that, try OS-specific locations.\r
- if _os.name == 'riscos':\r
- dirname = _os.getenv('Wimp$ScrapDir')\r
- if dirname: dirlist.append(dirname)\r
- elif _os.name == 'nt':\r
- dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])\r
- else:\r
- dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])\r
-\r
- # As a last resort, the current directory.\r
- try:\r
- dirlist.append(_os.getcwd())\r
- except (AttributeError, _os.error):\r
- dirlist.append(_os.curdir)\r
-\r
- return dirlist\r
-\r
-def _get_default_tempdir():\r
- """Calculate the default directory to use for temporary files.\r
- This routine should be called exactly once.\r
-\r
- We determine whether or not a candidate temp dir is usable by\r
- trying to create and write to a file in that directory. If this\r
- is successful, the test file is deleted. To prevent denial of\r
- service, the name of the test file must be randomized."""\r
-\r
- namer = _RandomNameSequence()\r
- dirlist = _candidate_tempdir_list()\r
- flags = _text_openflags\r
-\r
- for dir in dirlist:\r
- if dir != _os.curdir:\r
- dir = _os.path.normcase(_os.path.abspath(dir))\r
- # Try only a few names per directory.\r
- for seq in xrange(100):\r
- name = namer.next()\r
- filename = _os.path.join(dir, name)\r
- try:\r
- fd = _os.open(filename, flags, 0600)\r
- fp = _os.fdopen(fd, 'w')\r
- fp.write('blat')\r
- fp.close()\r
- _os.unlink(filename)\r
- del fp, fd\r
- return dir\r
- except (OSError, IOError), e:\r
- if e[0] != _errno.EEXIST:\r
- break # no point trying more names in this directory\r
- pass\r
- raise IOError, (_errno.ENOENT,\r
- ("No usable temporary directory found in %s" % dirlist))\r
-\r
-_name_sequence = None\r
-\r
-def _get_candidate_names():\r
- """Common setup sequence for all user-callable interfaces."""\r
-\r
- global _name_sequence\r
- if _name_sequence is None:\r
- _once_lock.acquire()\r
- try:\r
- if _name_sequence is None:\r
- _name_sequence = _RandomNameSequence()\r
- finally:\r
- _once_lock.release()\r
- return _name_sequence\r
-\r
-\r
-def _mkstemp_inner(dir, pre, suf, flags):\r
- """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""\r
-\r
- names = _get_candidate_names()\r
-\r
- for seq in xrange(TMP_MAX):\r
- name = names.next()\r
- file = _os.path.join(dir, pre + name + suf)\r
- try:\r
- fd = _os.open(file, flags, 0600)\r
- _set_cloexec(fd)\r
- return (fd, _os.path.abspath(file))\r
- except OSError, e:\r
- if e.errno == _errno.EEXIST:\r
- continue # try again\r
- raise\r
-\r
- raise IOError, (_errno.EEXIST, "No usable temporary file name found")\r
-\r
-\r
-# User visible interfaces.\r
-\r
-def gettempprefix():\r
- """Accessor for tempdir.template."""\r
- return template\r
-\r
-tempdir = None\r
-\r
-def gettempdir():\r
- """Accessor for tempfile.tempdir."""\r
- global tempdir\r
- if tempdir is None:\r
- _once_lock.acquire()\r
- try:\r
- if tempdir is None:\r
- tempdir = _get_default_tempdir()\r
- finally:\r
- _once_lock.release()\r
- return tempdir\r
-\r
-def mkstemp(suffix="", prefix=template, dir=None, text=False):\r
- """User-callable function to create and return a unique temporary\r
- file. The return value is a pair (fd, name) where fd is the\r
- file descriptor returned by os.open, and name is the filename.\r
-\r
- If 'suffix' is specified, the file name will end with that suffix,\r
- otherwise there will be no suffix.\r
-\r
- If 'prefix' is specified, the file name will begin with that prefix,\r
- otherwise a default prefix is used.\r
-\r
- If 'dir' is specified, the file will be created in that directory,\r
- otherwise a default directory is used.\r
-\r
- If 'text' is specified and true, the file is opened in text\r
- mode. Else (the default) the file is opened in binary mode. On\r
- some operating systems, this makes no difference.\r
-\r
- The file is readable and writable only by the creating user ID.\r
- If the operating system uses permission bits to indicate whether a\r
- file is executable, the file is executable by no one. The file\r
- descriptor is not inherited by children of this process.\r
-\r
- Caller is responsible for deleting the file when done with it.\r
- """\r
-\r
- if dir is None:\r
- dir = gettempdir()\r
-\r
- if text:\r
- flags = _text_openflags\r
- else:\r
- flags = _bin_openflags\r
-\r
- return _mkstemp_inner(dir, prefix, suffix, flags)\r
-\r
-\r
-def mkdtemp(suffix="", prefix=template, dir=None):\r
- """User-callable function to create and return a unique temporary\r
- directory. The return value is the pathname of the directory.\r
-\r
- Arguments are as for mkstemp, except that the 'text' argument is\r
- not accepted.\r
-\r
- The directory is readable, writable, and searchable only by the\r
- creating user.\r
-\r
- Caller is responsible for deleting the directory when done with it.\r
- """\r
-\r
- if dir is None:\r
- dir = gettempdir()\r
-\r
- names = _get_candidate_names()\r
-\r
- for seq in xrange(TMP_MAX):\r
- name = names.next()\r
- file = _os.path.join(dir, prefix + name + suffix)\r
- try:\r
- _os.mkdir(file, 0700)\r
- return file\r
- except OSError, e:\r
- if e.errno == _errno.EEXIST:\r
- continue # try again\r
- raise\r
-\r
- raise IOError, (_errno.EEXIST, "No usable temporary directory name found")\r
-\r
-def mktemp(suffix="", prefix=template, dir=None):\r
- """User-callable function to return a unique temporary file name. The\r
- file is not created.\r
-\r
- Arguments are as for mkstemp, except that the 'text' argument is\r
- not accepted.\r
-\r
- This function is unsafe and should not be used. The file name\r
- refers to a file that did not exist at some point, but by the time\r
- you get around to creating it, someone else may have beaten you to\r
- the punch.\r
- """\r
-\r
-## from warnings import warn as _warn\r
-## _warn("mktemp is a potential security risk to your program",\r
-## RuntimeWarning, stacklevel=2)\r
-\r
- if dir is None:\r
- dir = gettempdir()\r
-\r
- names = _get_candidate_names()\r
- for seq in xrange(TMP_MAX):\r
- name = names.next()\r
- file = _os.path.join(dir, prefix + name + suffix)\r
- if not _exists(file):\r
- return file\r
-\r
- raise IOError, (_errno.EEXIST, "No usable temporary filename found")\r
-\r
-\r
-class _TemporaryFileWrapper:\r
- """Temporary file wrapper\r
-\r
- This class provides a wrapper around files opened for\r
- temporary use. In particular, it seeks to automatically\r
- remove the file when it is no longer needed.\r
- """\r
-\r
- def __init__(self, file, name, delete=True):\r
- self.file = file\r
- self.name = name\r
- self.close_called = False\r
- self.delete = delete\r
-\r
- def __getattr__(self, name):\r
- # Attribute lookups are delegated to the underlying file\r
- # and cached for non-numeric results\r
- # (i.e. methods are cached, closed and friends are not)\r
- file = self.__dict__['file']\r
- a = getattr(file, name)\r
- if not issubclass(type(a), type(0)):\r
- setattr(self, name, a)\r
- return a\r
-\r
- # The underlying __enter__ method returns the wrong object\r
- # (self.file) so override it to return the wrapper\r
- def __enter__(self):\r
- self.file.__enter__()\r
- return self\r
-\r
- # NT provides delete-on-close as a primitive, so we don't need\r
- # the wrapper to do anything special. We still use it so that\r
- # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.\r
- if _os.name != 'nt':\r
- # Cache the unlinker so we don't get spurious errors at\r
- # shutdown when the module-level "os" is None'd out. Note\r
- # that this must be referenced as self.unlink, because the\r
- # name TemporaryFileWrapper may also get None'd out before\r
- # __del__ is called.\r
- unlink = _os.unlink\r
-\r
- def close(self):\r
- if not self.close_called:\r
- self.close_called = True\r
- self.file.close()\r
- if self.delete:\r
- self.unlink(self.name)\r
-\r
- def __del__(self):\r
- self.close()\r
-\r
- # Need to trap __exit__ as well to ensure the file gets\r
- # deleted when used in a with statement\r
- def __exit__(self, exc, value, tb):\r
- result = self.file.__exit__(exc, value, tb)\r
- self.close()\r
- return result\r
- else:\r
- def __exit__(self, exc, value, tb):\r
- self.file.__exit__(exc, value, tb)\r
-\r
-\r
-def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",\r
- prefix=template, dir=None, delete=True):\r
- """Create and return a temporary file.\r
- Arguments:\r
- 'prefix', 'suffix', 'dir' -- as for mkstemp.\r
- 'mode' -- the mode argument to os.fdopen (default "w+b").\r
- 'bufsize' -- the buffer size argument to os.fdopen (default -1).\r
- 'delete' -- whether the file is deleted on close (default True).\r
- The file is created as mkstemp() would do it.\r
-\r
- Returns an object with a file-like interface; the name of the file\r
- is accessible as file.name. The file will be automatically deleted\r
- when it is closed unless the 'delete' argument is set to False.\r
- """\r
-\r
- if dir is None:\r
- dir = gettempdir()\r
-\r
- if 'b' in mode:\r
- flags = _bin_openflags\r
- else:\r
- flags = _text_openflags\r
-\r
- # Setting O_TEMPORARY in the flags causes the OS to delete\r
- # the file when it is closed. This is only supported by Windows.\r
- if _os.name == 'nt' and delete:\r
- flags |= _os.O_TEMPORARY\r
-\r
- (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)\r
- file = _os.fdopen(fd, mode, bufsize)\r
- return _TemporaryFileWrapper(file, name, delete)\r
-\r
-if _os.name != 'posix' or _os.sys.platform == 'cygwin':\r
- # On non-POSIX and Cygwin systems, assume that we cannot unlink a file\r
- # while it is open.\r
- TemporaryFile = NamedTemporaryFile\r
-\r
-else:\r
- def TemporaryFile(mode='w+b', bufsize=-1, suffix="",\r
- prefix=template, dir=None):\r
- """Create and return a temporary file.\r
- Arguments:\r
- 'prefix', 'suffix', 'dir' -- as for mkstemp.\r
- 'mode' -- the mode argument to os.fdopen (default "w+b").\r
- 'bufsize' -- the buffer size argument to os.fdopen (default -1).\r
- The file is created as mkstemp() would do it.\r
-\r
- Returns an object with a file-like interface. The file has no\r
- name, and will cease to exist when it is closed.\r
- """\r
-\r
- if dir is None:\r
- dir = gettempdir()\r
-\r
- if 'b' in mode:\r
- flags = _bin_openflags\r
- else:\r
- flags = _text_openflags\r
-\r
- (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)\r
- try:\r
- _os.unlink(name)\r
- return _os.fdopen(fd, mode, bufsize)\r
- except:\r
- _os.close(fd)\r
- raise\r
-\r
-class SpooledTemporaryFile:\r
- """Temporary file wrapper, specialized to switch from\r
- StringIO to a real file when it exceeds a certain size or\r
- when a fileno is needed.\r
- """\r
- _rolled = False\r
-\r
- def __init__(self, max_size=0, mode='w+b', bufsize=-1,\r
- suffix="", prefix=template, dir=None):\r
- self._file = _StringIO()\r
- self._max_size = max_size\r
- self._rolled = False\r
- self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir)\r
-\r
- def _check(self, file):\r
- if self._rolled: return\r
- max_size = self._max_size\r
- if max_size and file.tell() > max_size:\r
- self.rollover()\r
-\r
- def rollover(self):\r
- if self._rolled: return\r
- file = self._file\r
- newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)\r
- del self._TemporaryFileArgs\r
-\r
- newfile.write(file.getvalue())\r
- newfile.seek(file.tell(), 0)\r
-\r
- self._rolled = True\r
-\r
- # The method caching trick from NamedTemporaryFile\r
- # won't work here, because _file may change from a\r
- # _StringIO instance to a real file. So we list\r
- # all the methods directly.\r
-\r
- # Context management protocol\r
- def __enter__(self):\r
- if self._file.closed:\r
- raise ValueError("Cannot enter context with closed file")\r
- return self\r
-\r
- def __exit__(self, exc, value, tb):\r
- self._file.close()\r
-\r
- # file protocol\r
- def __iter__(self):\r
- return self._file.__iter__()\r
-\r
- def close(self):\r
- self._file.close()\r
-\r
- @property\r
- def closed(self):\r
- return self._file.closed\r
-\r
- @property\r
- def encoding(self):\r
- return self._file.encoding\r
-\r
- def fileno(self):\r
- self.rollover()\r
- return self._file.fileno()\r
-\r
- def flush(self):\r
- self._file.flush()\r
-\r
- def isatty(self):\r
- return self._file.isatty()\r
-\r
- @property\r
- def mode(self):\r
- return self._file.mode\r
-\r
- @property\r
- def name(self):\r
- return self._file.name\r
-\r
- @property\r
- def newlines(self):\r
- return self._file.newlines\r
-\r
- def next(self):\r
- return self._file.next\r
-\r
- def read(self, *args):\r
- return self._file.read(*args)\r
-\r
- def readline(self, *args):\r
- return self._file.readline(*args)\r
-\r
- def readlines(self, *args):\r
- return self._file.readlines(*args)\r
-\r
- def seek(self, *args):\r
- self._file.seek(*args)\r
-\r
- @property\r
- def softspace(self):\r
- return self._file.softspace\r
-\r
- def tell(self):\r
- return self._file.tell()\r
-\r
- def truncate(self):\r
- self._file.truncate()\r
-\r
- def write(self, s):\r
- file = self._file\r
- rv = file.write(s)\r
- self._check(file)\r
- return rv\r
-\r
- def writelines(self, iterable):\r
- file = self._file\r
- rv = file.writelines(iterable)\r
- self._check(file)\r
- return rv\r
-\r
- def xreadlines(self, *args):\r
- return self._file.xreadlines(*args)\r