+++ /dev/null
-"""CGI-savvy HTTP Server.\r
-\r
-This module builds on SimpleHTTPServer by implementing GET and POST\r
-requests to cgi-bin scripts.\r
-\r
-If the os.fork() function is not present (e.g. on Windows),\r
-os.popen2() is used as a fallback, with slightly altered semantics; if\r
-that function is not present either (e.g. on Macintosh), only Python\r
-scripts are supported, and they are executed by the current process.\r
-\r
-In all cases, the implementation is intentionally naive -- all\r
-requests are executed sychronously.\r
-\r
-SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL\r
--- it may execute arbitrary Python code or external programs.\r
-\r
-Note that status code 200 is sent prior to execution of a CGI script, so\r
-scripts cannot send other status codes such as 302 (redirect).\r
-"""\r
-\r
-\r
-__version__ = "0.4"\r
-\r
-__all__ = ["CGIHTTPRequestHandler"]\r
-\r
-import os\r
-import sys\r
-import urllib\r
-import BaseHTTPServer\r
-import SimpleHTTPServer\r
-import select\r
-import copy\r
-\r
-\r
-class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):\r
-\r
- """Complete HTTP server with GET, HEAD and POST commands.\r
-\r
- GET and HEAD also support running CGI scripts.\r
-\r
- The POST command is *only* implemented for CGI scripts.\r
-\r
- """\r
-\r
- # Determine platform specifics\r
- have_fork = hasattr(os, 'fork')\r
- have_popen2 = hasattr(os, 'popen2')\r
- have_popen3 = hasattr(os, 'popen3')\r
-\r
- # Make rfile unbuffered -- we need to read one line and then pass\r
- # the rest to a subprocess, so we can't use buffered input.\r
- rbufsize = 0\r
-\r
- def do_POST(self):\r
- """Serve a POST request.\r
-\r
- This is only implemented for CGI scripts.\r
-\r
- """\r
-\r
- if self.is_cgi():\r
- self.run_cgi()\r
- else:\r
- self.send_error(501, "Can only POST to CGI scripts")\r
-\r
- def send_head(self):\r
- """Version of send_head that support CGI scripts"""\r
- if self.is_cgi():\r
- return self.run_cgi()\r
- else:\r
- return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)\r
-\r
- def is_cgi(self):\r
- """Test whether self.path corresponds to a CGI script.\r
-\r
- Returns True and updates the cgi_info attribute to the tuple\r
- (dir, rest) if self.path requires running a CGI script.\r
- Returns False otherwise.\r
-\r
- If any exception is raised, the caller should assume that\r
- self.path was rejected as invalid and act accordingly.\r
-\r
- The default implementation tests whether the normalized url\r
- path begins with one of the strings in self.cgi_directories\r
- (and the next character is a '/' or the end of the string).\r
- """\r
- splitpath = _url_collapse_path_split(self.path)\r
- if splitpath[0] in self.cgi_directories:\r
- self.cgi_info = splitpath\r
- return True\r
- return False\r
-\r
- cgi_directories = ['/cgi-bin', '/htbin']\r
-\r
- def is_executable(self, path):\r
- """Test whether argument path is an executable file."""\r
- return executable(path)\r
-\r
- def is_python(self, path):\r
- """Test whether argument path is a Python script."""\r
- head, tail = os.path.splitext(path)\r
- return tail.lower() in (".py", ".pyw")\r
-\r
- def run_cgi(self):\r
- """Execute a CGI script."""\r
- path = self.path\r
- dir, rest = self.cgi_info\r
-\r
- i = path.find('/', len(dir) + 1)\r
- while i >= 0:\r
- nextdir = path[:i]\r
- nextrest = path[i+1:]\r
-\r
- scriptdir = self.translate_path(nextdir)\r
- if os.path.isdir(scriptdir):\r
- dir, rest = nextdir, nextrest\r
- i = path.find('/', len(dir) + 1)\r
- else:\r
- break\r
-\r
- # find an explicit query string, if present.\r
- i = rest.rfind('?')\r
- if i >= 0:\r
- rest, query = rest[:i], rest[i+1:]\r
- else:\r
- query = ''\r
-\r
- # dissect the part after the directory name into a script name &\r
- # a possible additional path, to be stored in PATH_INFO.\r
- i = rest.find('/')\r
- if i >= 0:\r
- script, rest = rest[:i], rest[i:]\r
- else:\r
- script, rest = rest, ''\r
-\r
- scriptname = dir + '/' + script\r
- scriptfile = self.translate_path(scriptname)\r
- if not os.path.exists(scriptfile):\r
- self.send_error(404, "No such CGI script (%r)" % scriptname)\r
- return\r
- if not os.path.isfile(scriptfile):\r
- self.send_error(403, "CGI script is not a plain file (%r)" %\r
- scriptname)\r
- return\r
- ispy = self.is_python(scriptname)\r
- if not ispy:\r
- if not (self.have_fork or self.have_popen2 or self.have_popen3):\r
- self.send_error(403, "CGI script is not a Python script (%r)" %\r
- scriptname)\r
- return\r
- if not self.is_executable(scriptfile):\r
- self.send_error(403, "CGI script is not executable (%r)" %\r
- scriptname)\r
- return\r
-\r
- # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html\r
- # XXX Much of the following could be prepared ahead of time!\r
- env = copy.deepcopy(os.environ)\r
- env['SERVER_SOFTWARE'] = self.version_string()\r
- env['SERVER_NAME'] = self.server.server_name\r
- env['GATEWAY_INTERFACE'] = 'CGI/1.1'\r
- env['SERVER_PROTOCOL'] = self.protocol_version\r
- env['SERVER_PORT'] = str(self.server.server_port)\r
- env['REQUEST_METHOD'] = self.command\r
- uqrest = urllib.unquote(rest)\r
- env['PATH_INFO'] = uqrest\r
- env['PATH_TRANSLATED'] = self.translate_path(uqrest)\r
- env['SCRIPT_NAME'] = scriptname\r
- if query:\r
- env['QUERY_STRING'] = query\r
- host = self.address_string()\r
- if host != self.client_address[0]:\r
- env['REMOTE_HOST'] = host\r
- env['REMOTE_ADDR'] = self.client_address[0]\r
- authorization = self.headers.getheader("authorization")\r
- if authorization:\r
- authorization = authorization.split()\r
- if len(authorization) == 2:\r
- import base64, binascii\r
- env['AUTH_TYPE'] = authorization[0]\r
- if authorization[0].lower() == "basic":\r
- try:\r
- authorization = base64.decodestring(authorization[1])\r
- except binascii.Error:\r
- pass\r
- else:\r
- authorization = authorization.split(':')\r
- if len(authorization) == 2:\r
- env['REMOTE_USER'] = authorization[0]\r
- # XXX REMOTE_IDENT\r
- if self.headers.typeheader is None:\r
- env['CONTENT_TYPE'] = self.headers.type\r
- else:\r
- env['CONTENT_TYPE'] = self.headers.typeheader\r
- length = self.headers.getheader('content-length')\r
- if length:\r
- env['CONTENT_LENGTH'] = length\r
- referer = self.headers.getheader('referer')\r
- if referer:\r
- env['HTTP_REFERER'] = referer\r
- accept = []\r
- for line in self.headers.getallmatchingheaders('accept'):\r
- if line[:1] in "\t\n\r ":\r
- accept.append(line.strip())\r
- else:\r
- accept = accept + line[7:].split(',')\r
- env['HTTP_ACCEPT'] = ','.join(accept)\r
- ua = self.headers.getheader('user-agent')\r
- if ua:\r
- env['HTTP_USER_AGENT'] = ua\r
- co = filter(None, self.headers.getheaders('cookie'))\r
- if co:\r
- env['HTTP_COOKIE'] = ', '.join(co)\r
- # XXX Other HTTP_* headers\r
- # Since we're setting the env in the parent, provide empty\r
- # values to override previously set values\r
- for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',\r
- 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):\r
- env.setdefault(k, "")\r
-\r
- self.send_response(200, "Script output follows")\r
-\r
- decoded_query = query.replace('+', ' ')\r
-\r
- if self.have_fork:\r
- # Unix -- fork as we should\r
- args = [script]\r
- if '=' not in decoded_query:\r
- args.append(decoded_query)\r
- nobody = nobody_uid()\r
- self.wfile.flush() # Always flush before forking\r
- pid = os.fork()\r
- if pid != 0:\r
- # Parent\r
- pid, sts = os.waitpid(pid, 0)\r
- # throw away additional data [see bug #427345]\r
- while select.select([self.rfile], [], [], 0)[0]:\r
- if not self.rfile.read(1):\r
- break\r
- if sts:\r
- self.log_error("CGI script exit status %#x", sts)\r
- return\r
- # Child\r
- try:\r
- try:\r
- os.setuid(nobody)\r
- except os.error:\r
- pass\r
- os.dup2(self.rfile.fileno(), 0)\r
- os.dup2(self.wfile.fileno(), 1)\r
- os.execve(scriptfile, args, env)\r
- except:\r
- self.server.handle_error(self.request, self.client_address)\r
- os._exit(127)\r
-\r
- else:\r
- # Non Unix - use subprocess\r
- import subprocess\r
- cmdline = [scriptfile]\r
- if self.is_python(scriptfile):\r
- interp = sys.executable\r
- if interp.lower().endswith("w.exe"):\r
- # On Windows, use python.exe, not pythonw.exe\r
- interp = interp[:-5] + interp[-4:]\r
- cmdline = [interp, '-u'] + cmdline\r
- if '=' not in query:\r
- cmdline.append(query)\r
-\r
- self.log_message("command: %s", subprocess.list2cmdline(cmdline))\r
- try:\r
- nbytes = int(length)\r
- except (TypeError, ValueError):\r
- nbytes = 0\r
- p = subprocess.Popen(cmdline,\r
- stdin = subprocess.PIPE,\r
- stdout = subprocess.PIPE,\r
- stderr = subprocess.PIPE,\r
- env = env\r
- )\r
- if self.command.lower() == "post" and nbytes > 0:\r
- data = self.rfile.read(nbytes)\r
- else:\r
- data = None\r
- # throw away additional data [see bug #427345]\r
- while select.select([self.rfile._sock], [], [], 0)[0]:\r
- if not self.rfile._sock.recv(1):\r
- break\r
- stdout, stderr = p.communicate(data)\r
- self.wfile.write(stdout)\r
- if stderr:\r
- self.log_error('%s', stderr)\r
- p.stderr.close()\r
- p.stdout.close()\r
- status = p.returncode\r
- if status:\r
- self.log_error("CGI script exit status %#x", status)\r
- else:\r
- self.log_message("CGI script exited OK")\r
-\r
-\r
-# TODO(gregory.p.smith): Move this into an appropriate library.\r
-def _url_collapse_path_split(path):\r
- """\r
- Given a URL path, remove extra '/'s and '.' path elements and collapse\r
- any '..' references.\r
-\r
- Implements something akin to RFC-2396 5.2 step 6 to parse relative paths.\r
-\r
- Returns: A tuple of (head, tail) where tail is everything after the final /\r
- and head is everything before it. Head will always start with a '/' and,\r
- if it contains anything else, never have a trailing '/'.\r
-\r
- Raises: IndexError if too many '..' occur within the path.\r
- """\r
- # Similar to os.path.split(os.path.normpath(path)) but specific to URL\r
- # path semantics rather than local operating system semantics.\r
- path_parts = []\r
- for part in path.split('/'):\r
- if part == '.':\r
- path_parts.append('')\r
- else:\r
- path_parts.append(part)\r
- # Filter out blank non trailing parts before consuming the '..'.\r
- path_parts = [part for part in path_parts[:-1] if part] + path_parts[-1:]\r
- if path_parts:\r
- tail_part = path_parts.pop()\r
- else:\r
- tail_part = ''\r
- head_parts = []\r
- for part in path_parts:\r
- if part == '..':\r
- head_parts.pop()\r
- else:\r
- head_parts.append(part)\r
- if tail_part and tail_part == '..':\r
- head_parts.pop()\r
- tail_part = ''\r
- return ('/' + '/'.join(head_parts), tail_part)\r
-\r
-\r
-nobody = None\r
-\r
-def nobody_uid():\r
- """Internal routine to get nobody's uid"""\r
- global nobody\r
- if nobody:\r
- return nobody\r
- try:\r
- import pwd\r
- except ImportError:\r
- return -1\r
- try:\r
- nobody = pwd.getpwnam('nobody')[2]\r
- except KeyError:\r
- nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))\r
- return nobody\r
-\r
-\r
-def executable(path):\r
- """Test for executable file."""\r
- try:\r
- st = os.stat(path)\r
- except os.error:\r
- return False\r
- return st.st_mode & 0111 != 0\r
-\r
-\r
-def test(HandlerClass = CGIHTTPRequestHandler,\r
- ServerClass = BaseHTTPServer.HTTPServer):\r
- SimpleHTTPServer.test(HandlerClass, ServerClass)\r
-\r
-\r
-if __name__ == '__main__':\r
- test()\r