""" This is a stupid simple WSGI app to test which HTTP servers actually call the .close() method on iterable responses as they're supposed to. Also included are several run_* functions to launch the application with different WSGI servers. Run this script with 'python sleepy_app.py' and you'll be asked which server you want to try. After making your selection, try connecting to the server with curl: curl -N http://localhost:8000/ You should see a counter printed in your terminal, incrementing once every 10 seconds. Hit Ctrl-C on the curl window to disconnect the client. Then watch the server's output. If running with a WSGI-compliant server, you should see "SLEEPY CONNECTION CLOSE" printed to the terminal. """ class SleepyApp(object): def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response def __iter__(self): self.start_response('200 OK', [('Content-type', 'text/plain')]) # print out one number every 10 seconds. import time # imported late for easier gevent patching counter = 0 while True: print "SLEEPY", counter yield str(counter) + '\n' counter += 1 time.sleep(10) def close(self): print "SLEEPY CONNECTION CLOSE" def run_wsgiref(): from wsgiref.simple_server import make_server httpd = make_server('', 8000, SleepyApp) print "Serving on port 8000..." httpd.serve_forever() def run_cherrypy(): import cherrypy cherrypy.tree.graft(SleepyApp) cherrypy.server.socket_host = '0.0.0.0' cherrypy.server.socket_port = 8000 cherrypy.engine.start() cherrypy.engine.block() def run_gevent(): from gevent.monkey import patch_all patch_all() from gevent.pywsgi import WSGIServer server = WSGIServer(('0.0.0.0', 8000), SleepyApp) print "Server running on port 0.0.0.0:8000. Ctrl+C to quit" server.serve_forever() def run_werkzeug(): from werkzeug.serving import run_simple run_simple('0.0.0.0', 8000, SleepyApp) def run_gunicorn_sync(): import subprocess subprocess.call([ 'gunicorn', '-w', '1', '--worker-class', 'sync', 'sleepy_app:SleepyApp', ]) def run_gunicorn_gevent(): import subprocess subprocess.call([ 'gunicorn', '-w', '1', '--worker-class', 'gevent', 'sleepy_app:SleepyApp', ]) if __name__ == '__main__': funcs = {} attrs = globals().copy() for a in attrs: if callable(attrs[a]) and a.startswith('run_'): funcs[a] = attrs[a] names = [n.replace('run_', '') for n in funcs] names.sort() def get_server(): print "You can run any of these servers:\n" + '\n'.join(names) + '\n' server = raw_input('Which server would you like to try?\n') if server not in names: return get_server() return server server = get_server() print "Running", server funcs['run_' + server]()