"""CGI-savvy HTTP Server. This module builds on SimpleHTTPServer by implementing GET and POST requests to cgi-bin scripts. If the os.fork() function is not present (e.g. on Windows), os.popen2() is used as a fallback, with slightly altered semantics; if that function is not present either (e.g. on Macintosh), only Python scripts are supported, and they are executed by the current process. In all cases, the implementation is intentionally naive -- all requests are executed sychronously. SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL -- it may execute arbitrary Python code or external programs. Note that status code 200 is sent prior to execution of a CGI script, so scripts cannot send other status codes such as 302 (redirect). """ __version__ = "0.4" __all__ = ["CGIHTTPRequestHandler"] import os import sys import urllib import BaseHTTPServer import SimpleHTTPServer import select class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): """Complete HTTP server with GET, HEAD and POST commands. GET and HEAD also support running CGI scripts. The POST command is *only* implemented for CGI scripts. """ # Determine platform specifics have_fork = hasattr(os, 'fork') have_popen2 = hasattr(os, 'popen2') # Use of popen3 results in deadlock when a Python CGI fails, the # interpreter produces stderr, but this server is reading stdout first. # Commenting this means that the CGI won't have stderr, but can make its # own. # have_popen3 = hasattr(os, 'popen3') have_popen3 = False # Make rfile unbuffered -- we need to read one line and then pass # the rest to a subprocess, so we can't use buffered input. rbufsize = 0 def handle_one_request(self): """Handle a single HTTP request. You normally don't need to override this method; see the class __doc__ string for information on how to handle specific HTTP commands such as GET and POST. """ self.raw_requestline = self.rfile.readline() if not self.raw_requestline: self.close_connection = 1 return if not self.parse_request(): # An error code has been sent, just exit return # mod_rewrite goes here :) ru = self.path self.request_uri = ru host = self.headers.getheader('Host') self.path = '/cgi-bin/2.cgi' + ru mname = 'do_' + self.command if not hasattr(self, mname): self.send_error(501, "Unsupported method (%r)" % self.command) return method = getattr(self, mname) method() def do_POST(self): """Serve a POST request. This is only implemented for CGI scripts. """ if self.is_cgi(): self.run_cgi() else: self.send_error(501, "Can only POST to CGI scripts") def send_head(self): """Version of send_head that support CGI scripts""" if self.is_cgi(): return self.run_cgi() else: return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self) def is_cgi(self): """Test whether self.path corresponds to a CGI script, and return a boolean. This function sets self.cgi_info to a tuple (dir, rest) when it returns True, where dir is the directory part before the CGI script name. Note that rest begins with a slash if it is not empty. The default implementation tests whether the path begins with one of the strings in the list self.cgi_directories (and the next character is a '/' or the end of the string). """ path = self.path for x in self.cgi_directories: i = len(x) if path[:i] == x and (not path[i:] or path[i] == '/'): self.cgi_info = path[:i], path[i+1:] return True return False cgi_directories = ['/cgi-bin', '/htbin'] def is_executable(self, path): """Test whether argument path is an executable file.""" return executable(path, self) def is_python(self, path): """Test whether argument path is a Python script.""" return False head, tail = os.path.splitext(path) return tail.lower() in (".py", ".pyw") def make_cmdline( self, scriptfile, query ): cmdline = [ scriptfile ] if sys.platform == 'win32': fh = None try: fh = open( scriptfile, 'rb') lin = fh.readline() if lin[ 0:2 ] == b'#!': # unix shebang lin = fh.readline() if lin[ 0:2 ] == b'#!': # Windows shebang shebang = lin[ 2: ].strip().decode("UTF-8") cmdline = [ shebang ] + cmdline fh.close() except Exception: typ, val, trbk = sys.exc_info() import traceback self.log_error( "traceback %s", ''.join( traceback.format_exception( typ, val, trbk ))) if fh: fh.close() else: try: fun = getattr( server.CGIHTTPRequestHandler, 'make_cmdline') except AttributeError: pass else: return fun( self, scriptfile ) self.log_error("no 'make_cmdline' in CGIHTTPRequestHandler") return cmdline def _readerthread(self, fh, buffer): buffer.append(fh.read()) def _writerthread(self, fhr, fhw, length=None): if length is None: flag = True while flag: buf = fhr.read( 512 ) fhw.write( buf ) if len( buf ) == 0: flag = False else: while length > 0: buf = fhr.read( min( 512, length )) fhw.write( buf ) length -= len( buf ) # throw away additional data [see bug #427345] while select.select([fhr._sock], [], [], 0)[0]: if not fhr._sock.recv(1): break fhw.close() def run_cgi(self): """Execute a CGI script.""" path = self.path dir, rest = self.cgi_info i = path.find('/', len(dir) + 1) while i >= 0: nextdir = path[:i] nextrest = path[i+1:] scriptdir = self.translate_path(nextdir) if os.path.isdir(scriptdir): dir, rest = nextdir, nextrest i = path.find('/', len(dir) + 1) else: break # find an explicit query string, if present. i = rest.find('?') if i >= 0: rest, query = rest[:i], rest[i+1:] else: query = '' # dissect the part after the directory name into a script name & # a possible additional path, to be stored in PATH_INFO. i = rest.find('/') if i >= 0: script, rest = rest[:i], rest[i:] else: script, rest = rest, '' scriptname = dir + '/' + script scriptfile = self.translate_path(scriptname) if not os.path.exists(scriptfile): self.send_error(404, "No such CGI script (%r)" % scriptname) return if not os.path.isfile(scriptfile): self.send_error(403, "CGI script is not a plain file (%r)" % scriptname) return shebang = self.is_executable( scriptfile ) self.log_error("debug shebang: %s -- %s", scriptfile, shebang ) ispy = self.is_python(scriptname) if not ispy: if not (self.have_fork or self.have_popen2 or self.have_popen3): self.send_error(403, "CGI script is not a Python script (%r)" % scriptname) return if not shebang: self.send_error(403, "CGI script is not executable (%r)" % scriptname) return # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html # XXX Much of the following could be prepared ahead of time! env = {} env['SERVER_SOFTWARE'] = self.version_string() env['SERVER_NAME'] = self.server.server_name env['GATEWAY_INTERFACE'] = 'CGI/1.1' env['SERVER_PROTOCOL'] = self.protocol_version env['SERVER_PORT'] = str(self.server.server_port) env['REQUEST_METHOD'] = self.command uqrest = urllib.unquote(rest) env['PATH_INFO'] = uqrest env['PATH_TRANSLATED'] = self.translate_path(uqrest) env['SCRIPT_NAME'] = scriptname if query: env['QUERY_STRING'] = query host = self.address_string() if host != self.client_address[0]: env['REMOTE_HOST'] = host env['REMOTE_ADDR'] = self.client_address[0] env['REQUEST_URI'] = self.request_uri authorization = self.headers.getheader("authorization") if authorization: authorization = authorization.split() if len(authorization) == 2: import base64, binascii env['AUTH_TYPE'] = authorization[0] if authorization[0].lower() == "basic": try: authorization = base64.decodestring(authorization[1]) except binascii.Error: pass else: authorization = authorization.split(':') if len(authorization) == 2: env['REMOTE_USER'] = authorization[0] hoststring = self.headers.getheader("host") if hoststring: host, port = hoststring.split(':') env['HTTP_HOST'] = host env['HTTP_PORT'] = port env['HTTP_USER_AGENT'] = self.headers.getheader("user-agent") # XXX REMOTE_IDENT if self.headers.typeheader is None: env['CONTENT_TYPE'] = self.headers.type else: env['CONTENT_TYPE'] = self.headers.typeheader length = self.headers.getheader('content-length') if length: env['CONTENT_LENGTH'] = length referer = self.headers.getheader('referer') if referer: env['HTTP_REFERER'] = referer accept = [] for line in self.headers.getallmatchingheaders('accept'): if line[:1] in "\t\n\r ": accept.append(line.strip()) else: accept = accept + line[7:].split(',') env['HTTP_ACCEPT'] = ','.join(accept) ua = self.headers.getheader('user-agent') if ua: env['HTTP_USER_AGENT'] = ua co = filter(None, self.headers.getheaders('cookie')) if co: env['HTTP_COOKIE'] = ', '.join(co) # XXX Other HTTP_* headers # Since we're setting the env in the parent, provide empty # values to override previously set values for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH', 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'): env.setdefault(k, "") os.environ.update(env) decoded_query = query.replace('+', ' ') if self.have_fork: self.send_response(200, "Script output follows") # Unix -- fork as we should args = [script] if '=' not in decoded_query: args.append(decoded_query) nobody = nobody_uid() self.wfile.flush() # Always flush before forking pid = os.fork() if pid != 0: # Parent pid, sts = os.waitpid(pid, 0) # throw away additional data [see bug #427345] while select.select([self.rfile], [], [], 0)[0]: if not self.rfile.read(1): break if sts: self.log_error("CGI script exit status %#x", sts) return # Child try: try: os.setuid(nobody) except os.error: pass os.dup2(self.rfile.fileno(), 0) os.dup2(self.wfile.fileno(), 1) os.execve(scriptfile, args, os.environ) except: self.server.handle_error(self.request, self.client_address) os._exit(127) else: # Non-Unix -- use subprocess import subprocess, threading, shutil cmdline = self.make_cmdline( scriptfile, query ) self.log_message("command: %s", subprocess.list2cmdline(cmdline)) try: nbytes = int(length) except (TypeError, ValueError): nbytes = 0 p = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stderr = [] stderr_thread = threading.Thread(target=self._readerthread, args=(p.stderr, stderr)) stderr_thread.daemon = True stderr_thread.start() self.log_message("writer: %s" % str( nbytes )) stdin_thread = threading.Thread(target=self._writerthread, args=(self.rfile, p.stdin, nbytes)) stdin_thread.daemon = True stdin_thread.start() statusline = p.stdout.readline() self.log_message( "type: %s line: %s" % ( str( type( statusline )), str( statusline ))) try: sl = statusline.split(':', 1 ) if sl[ 0 ] != b"Status" or len( sl ) != 2: self.send_response( 500, "Bad Status from CGI") self.wfile.write( statusline ) else: sl = sl[ 1 ].split( None, 1 ) code = int( sl[ 0 ]) if len( sl ) == 1: text = None else: text = sl[ 1 ] self.send_response( code, text ) except Exception: self.send_response( 500, "Bad Status from CGI") self.wfile.write( statusline ) shutil.copyfileobj( p.stdout, self.wfile) status = p.wait() stderr_thread.join() stdin_thread.join() p.stderr.close() p.stdout.close() if stderr: stderr = stderr[ 0 ].decode("UTF-8") if stderr: self.log_error('%s', stderr ) if status: self.log_error("CGI script exit status %#x", status) else: self.log_message("CGI script exited OK") return ##### files = popenx(cmdline, 'b') fi = files[0] fo = files[1] if self.have_popen3: fe = files[2] if self.command.lower() == "post" and nbytes > 0: data = self.rfile.read(nbytes) fi.write(data) # throw away additional data [see bug #427345] while select.select([self.rfile._sock], [], [], 0)[0]: if not self.rfile._sock.recv(1): break fi.close() statusline = fo.readline() try: sl = statusline.split(':', 1 ) if sl[ 0 ] != "Status" or len( sl ) != 2: raise ValueError sl = sl[ 1 ].split( None, 1 ) if len( sl ) == 1: text = None else: text = sl[ 1 ] code = int( sl[ 0 ]) except Exception: self.send_response( 500, "Bad Status from CGI") else: self.send_response( code, text ) shutil.copyfileobj(fo, self.wfile) if self.have_popen3: errors = fe.read() fe.close() if errors: self.log_message("%s", "errors from stderr") self.log_error('%s', errors) sts = fo.close() if sts: self.log_error("CGI script exit status %#x", sts) else: self.log_message("CGI script exited OK") nobody = None def nobody_uid(): """Internal routine to get nobody's uid""" global nobody if nobody: return nobody try: import pwd except ImportError: return -1 try: nobody = pwd.getpwnam('nobody')[2] except KeyError: nobody = 1 + max(map(lambda x: x[2], pwd.getpwall())) return nobody def executable(path, log): """Test for executable file.""" try: st = os.stat(path) except os.error: return False if sys.platform == 'win32': fh = None try: fh = open( path, "rb" ) lin = fh.readline() if lin[ 0:2 ] == '#!': # unix shebang lin = fh.readline() if lin[ 0:2 ] == '#!': # Glenn's Windows shebang: executable fh.close() return lin[ 2: ].strip() fh.close() except Exception: if log: import traceback typ, val, trbk = sys.exc_info() log.log_error("traceback %s", ''.join( traceback.format_exception( typ, val, trbk ))) if fh: fh.close() return False return False else: return st.st_mode & 73 != 0 # 73 = 0111 or 0o111 def test(HandlerClass = CGIHTTPRequestHandler, ServerClass = BaseHTTPServer.HTTPServer): SimpleHTTPServer.test(HandlerClass, ServerClass) if __name__ == '__main__': test()