+++ /dev/null
-#! /usr/bin/env python\r
-\r
-"""RCS Proxy.\r
-\r
-Provide a simplified interface on RCS files, locally or remotely.\r
-The functionality is geared towards implementing some sort of\r
-remote CVS like utility. It is modeled after the similar module\r
-FSProxy.\r
-\r
-The module defines two classes:\r
-\r
-RCSProxyLocal -- used for local access\r
-RCSProxyServer -- used on the server side of remote access\r
-\r
-The corresponding client class, RCSProxyClient, is defined in module\r
-rcsclient.\r
-\r
-The remote classes are instantiated with an IP address and an optional\r
-verbosity flag.\r
-"""\r
-\r
-import server\r
-import md5\r
-import os\r
-import fnmatch\r
-import string\r
-import tempfile\r
-import rcslib\r
-\r
-\r
-class DirSupport:\r
-\r
- def __init__(self):\r
- self._dirstack = []\r
-\r
- def __del__(self):\r
- self._close()\r
-\r
- def _close(self):\r
- while self._dirstack:\r
- self.back()\r
-\r
- def pwd(self):\r
- return os.getcwd()\r
-\r
- def cd(self, name):\r
- save = os.getcwd()\r
- os.chdir(name)\r
- self._dirstack.append(save)\r
-\r
- def back(self):\r
- if not self._dirstack:\r
- raise os.error, "empty directory stack"\r
- dir = self._dirstack[-1]\r
- os.chdir(dir)\r
- del self._dirstack[-1]\r
-\r
- def listsubdirs(self, pat = None):\r
- files = os.listdir(os.curdir)\r
- files = filter(os.path.isdir, files)\r
- return self._filter(files, pat)\r
-\r
- def isdir(self, name):\r
- return os.path.isdir(name)\r
-\r
- def mkdir(self, name):\r
- os.mkdir(name, 0777)\r
-\r
- def rmdir(self, name):\r
- os.rmdir(name)\r
-\r
-\r
-class RCSProxyLocal(rcslib.RCS, DirSupport):\r
-\r
- def __init__(self):\r
- rcslib.RCS.__init__(self)\r
- DirSupport.__init__(self)\r
-\r
- def __del__(self):\r
- DirSupport.__del__(self)\r
- rcslib.RCS.__del__(self)\r
-\r
- def sumlist(self, list = None):\r
- return self._list(self.sum, list)\r
-\r
- def sumdict(self, list = None):\r
- return self._dict(self.sum, list)\r
-\r
- def sum(self, name_rev):\r
- f = self._open(name_rev)\r
- BUFFERSIZE = 1024*8\r
- sum = md5.new()\r
- while 1:\r
- buffer = f.read(BUFFERSIZE)\r
- if not buffer:\r
- break\r
- sum.update(buffer)\r
- self._closepipe(f)\r
- return sum.digest()\r
-\r
- def get(self, name_rev):\r
- f = self._open(name_rev)\r
- data = f.read()\r
- self._closepipe(f)\r
- return data\r
-\r
- def put(self, name_rev, data, message=None):\r
- name, rev = self._unmangle(name_rev)\r
- f = open(name, 'w')\r
- f.write(data)\r
- f.close()\r
- self.checkin(name_rev, message)\r
- self._remove(name)\r
-\r
- def _list(self, function, list = None):\r
- """INTERNAL: apply FUNCTION to all files in LIST.\r
-\r
- Return a list of the results.\r
-\r
- The list defaults to all files in the directory if None.\r
-\r
- """\r
- if list is None:\r
- list = self.listfiles()\r
- res = []\r
- for name in list:\r
- try:\r
- res.append((name, function(name)))\r
- except (os.error, IOError):\r
- res.append((name, None))\r
- return res\r
-\r
- def _dict(self, function, list = None):\r
- """INTERNAL: apply FUNCTION to all files in LIST.\r
-\r
- Return a dictionary mapping files to results.\r
-\r
- The list defaults to all files in the directory if None.\r
-\r
- """\r
- if list is None:\r
- list = self.listfiles()\r
- dict = {}\r
- for name in list:\r
- try:\r
- dict[name] = function(name)\r
- except (os.error, IOError):\r
- pass\r
- return dict\r
-\r
-\r
-class RCSProxyServer(RCSProxyLocal, server.SecureServer):\r
-\r
- def __init__(self, address, verbose = server.VERBOSE):\r
- RCSProxyLocal.__init__(self)\r
- server.SecureServer.__init__(self, address, verbose)\r
-\r
- def _close(self):\r
- server.SecureServer._close(self)\r
- RCSProxyLocal._close(self)\r
-\r
- def _serve(self):\r
- server.SecureServer._serve(self)\r
- # Retreat into start directory\r
- while self._dirstack: self.back()\r
-\r
-\r
-def test_server():\r
- import string\r
- import sys\r
- if sys.argv[1:]:\r
- port = string.atoi(sys.argv[1])\r
- else:\r
- port = 4127\r
- proxy = RCSProxyServer(('', port))\r
- proxy._serverloop()\r
-\r
-\r
-def test():\r
- import sys\r
- if not sys.argv[1:] or sys.argv[1] and sys.argv[1][0] in '0123456789':\r
- test_server()\r
- sys.exit(0)\r
- proxy = RCSProxyLocal()\r
- what = sys.argv[1]\r
- if hasattr(proxy, what):\r
- attr = getattr(proxy, what)\r
- if callable(attr):\r
- print apply(attr, tuple(sys.argv[2:]))\r
- else:\r
- print repr(attr)\r
- else:\r
- print "%s: no such attribute" % what\r
- sys.exit(2)\r
-\r
-\r
-if __name__ == '__main__':\r
- test()\r