Index: Doc/library/asynchat.rst =================================================================== --- Doc/library/asynchat.rst (revision 62704) +++ Doc/library/asynchat.rst (working copy) @@ -81,6 +81,12 @@ :exc:`NotImplementedError` exception. +.. method:: async_chat._collect_incoming_data(data) + + Sample implementation of a data collection rutine to be used in conjunction + with :meth:`_get_data` in a user-specified :meth:`found_terminator`. + + .. method:: async_chat.discard_buffers() In emergencies this method will discard any data held in the input and/or @@ -95,6 +101,12 @@ should be available via an instance attribute. +.. method:: async_chat._get_data() + + Will return and clear the data received with the sample + :meth:`_collect_incoming_data` implementation. + + .. method:: async_chat.get_terminator() Returns the current terminator for the channel. Index: Doc/library/asyncore.rst =================================================================== --- Doc/library/asyncore.rst (revision 62704) +++ Doc/library/asyncore.rst (working copy) @@ -222,7 +222,21 @@ flushed). Sockets are automatically closed when they are garbage-collected. +.. class:: file_dispatcher() + A file_dispatcher takes a file descriptor or file object along with an optional + map argument and wraps it for use with the :cfunc:`poll`\ or :cfunc:`loop`\ + functions. If provided a file object or anything with a :cfunc:`fileno`\ + method, that method will be called and passed to the :class:`file_wrapper` + constructor. + Availability: UNIX +.. class::file_wrapper() + A file_wrapper takes an integer file descriptor and calls os.dup() to duplicate + the handle so that the original handle may be closed independently of the + file_wrapper. This class implements sufficient methods to emulate a socket for + use by the file_dispatcher class. + Availability: UNIX + .. _asyncore-example: asyncore Example basic HTTP client Index: Lib/asynchat.py =================================================================== --- Lib/asynchat.py (revision 62704) +++ Lib/asynchat.py (working copy) @@ -60,16 +60,35 @@ ac_out_buffer_size = 4096 def __init__ (self, conn=None): + # for string terminator matching self.ac_in_buffer = '' - self.ac_out_buffer = '' - self.producer_fifo = fifo() + + # we use a list here rather than cStringIO for a few reasons... + # del lst[:] is faster than sio.truncate(0) + # lst = [] is faster than sio.truncate(0) + # cStringIO will be gaining unicode support in py3k, which + # will negatively affect the performance of bytes compared to + # a ''.join() equivalent + self.incoming = [] + + # we toss the use of the "simple producer" and replace it with + # a pure deque, which the original fifo was a wrapping of + self.producer_fifo = deque() asyncore.dispatcher.__init__ (self, conn) def collect_incoming_data(self, data): - raise NotImplementedError, "must be implemented in subclass" + raise NotImplementedError("must be implemented in subclass") + + def _collect_incoming_data(self, data): + self.incoming.append(data) + + def _get_data(self): + d = ''.join(self.incoming) + del self.incoming[:] + return d def found_terminator(self): - raise NotImplementedError, "must be implemented in subclass" + raise NotImplementedError("must be implemented in subclass") def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, an integer, or None" @@ -96,7 +115,7 @@ # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator - # combos with a single recv(1024). + # combos with a single recv(4096). while self.ac_in_buffer: lb = len(self.ac_in_buffer) @@ -150,88 +169,83 @@ self.ac_in_buffer = '' def handle_write (self): - self.initiate_send () + self.initiate_send() def handle_close (self): self.close() def push (self, data): - self.producer_fifo.push (simple_producer (data)) + sabs = self.ac_out_buffer_size + if len(data) > sabs: + for i in xrange(0, len(data), sabs): + self.producer_fifo.append(data[i:i+sabs]) + else: + self.producer_fifo.append(data) self.initiate_send() - + def push_with_producer (self, producer): - self.producer_fifo.push (producer) + self.producer_fifo.append(producer) self.initiate_send() - + def readable (self): "predicate for inclusion in the readable for select()" - return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + # cannot use the old predicate, it violates the claim of the + # set_terminator method. + + # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + return 1 def writable (self): "predicate for inclusion in the writable for select()" - # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) - # this is about twice as fast, though not as clear. - return not ( - (self.ac_out_buffer == '') and - self.producer_fifo.is_empty() and - self.connected - ) + return self.producer_fifo or (not self.connected) def close_when_done (self): "automatically close this channel once the outgoing queue is empty" - self.producer_fifo.push (None) + self.producer_fifo.append(None) - # refill the outgoing buffer by calling the more() method - # of the first producer in the queue - def refill_buffer (self): - while 1: - if len(self.producer_fifo): - p = self.producer_fifo.first() - # a 'None' in the producer fifo is a sentinel, - # telling us to close the channel. - if p is None: - if not self.ac_out_buffer: - self.producer_fifo.pop() - self.close() + def initiate_send(self): + while self.producer_fifo and self.connected: + first = self.producer_fifo[0] + # handle empty string/buffer or None entry + if not first: + del self.producer_fifo[0] + if first is None: + self.handle_close() return - elif isinstance(p, str): - self.producer_fifo.pop() - self.ac_out_buffer = self.ac_out_buffer + p - return - data = p.more() + + # handle classic producer behavior + obs = self.ac_out_buffer_size + try: + data = buffer(first, 0, obs) + except TypeError: + data = first.more() if data: - self.ac_out_buffer = self.ac_out_buffer + data - return + self.producer_fifo.appendleft(data) else: - self.producer_fifo.pop() - else: - return + del self.producer_fifo[0] + continue - def initiate_send (self): - obs = self.ac_out_buffer_size - # try to refill the buffer - if (len (self.ac_out_buffer) < obs): - self.refill_buffer() - - if self.ac_out_buffer and self.connected: - # try to send the buffer + # send the data try: - num_sent = self.send (self.ac_out_buffer[:obs]) - if num_sent: - self.ac_out_buffer = self.ac_out_buffer[num_sent:] - - except socket.error, why: + num_sent = self.send(data) + except socket.error: self.handle_error() return + if num_sent: + if num_sent < len(data) or obs < len(first): + self.producer_fifo[0] = first[num_sent:] + else: + del self.producer_fifo[0] + # we tried to send some actual data + return + def discard_buffers (self): # Emergencies only! self.ac_in_buffer = '' - self.ac_out_buffer = '' - while self.producer_fifo: - self.producer_fifo.pop() + del self.incoming[:] + self.producer_fifo.clear() - class simple_producer: def __init__ (self, data, buffer_size=512): Index: Lib/asyncore.py =================================================================== --- Lib/asyncore.py (revision 62704) +++ Lib/asyncore.py (working copy) @@ -53,20 +53,26 @@ import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ - ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode + ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode try: socket_map except NameError: socket_map = {} +def _strerror(err): + res = os.strerror(err) + if res == 'Unknown error': + res = errorcode[err] + return res + class ExitNow(Exception): pass def read(obj): try: obj.handle_read_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -74,15 +80,15 @@ def write(obj): try: obj.handle_write_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() -def _exception (obj): +def _exception(obj): try: obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -95,7 +101,7 @@ obj.handle_write_event() if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -116,14 +122,15 @@ e.append(fd) if [] == r == w == e: time.sleep(timeout) - else: - try: - r, w, e = select.select(r, w, e, timeout) - except select.error, err: - if err[0] != EINTR: - raise - else: - return + return + + try: + r, w, e = select.select(r, w, e, timeout) + except select.error, err: + if err[0] != EINTR: + raise + else: + return for fd in r: obj = map.get(fd) @@ -208,19 +215,30 @@ self._map = socket_map else: self._map = map - + + self._fileno = None + if sock: + # Set to nonblocking just to make sure for cases where we + # get a socket from a blocking source. + sock.setblocking(0) self.set_socket(sock, map) - # I think it should inherit this anyway - self.socket.setblocking(0) self.connected = True - # XXX Does the constructor require that the socket passed - # be connected? + # The constructor no longer requires that the socket + # passed be connected. try: self.addr = sock.getpeername() except socket.error: - # The addr isn't crucial - pass + if err[0] == ENOTCONN: + # To handle the case where we got an unconnected + # socket. + self.connected = False + else: + # The socket is broken in some unknown way, alert + # the user and remove it from the map (to prevent + # polling of broken sockets). + self.del_channel(map) + raise else: self.socket = None @@ -254,10 +272,9 @@ def create_socket(self, family, type): self.family_and_type = family, type - self.socket = socket.socket(family, type) - self.socket.setblocking(0) - self._fileno = self.socket.fileno() - self.add_channel() + sock = socket.socket(family, type) + sock.setblocking(0) + self.set_socket(sock) def set_socket(self, sock, map=None): self.socket = sock @@ -295,7 +312,7 @@ def listen(self, num): self.accepting = True if os.name == 'nt' and num > 5: - num = 1 + num = 5 return self.socket.listen(num) def bind(self, addr): @@ -310,10 +327,9 @@ return if err in (0, EISCONN): self.addr = address - self.connected = True - self.handle_connect() + self.handle_connect_event() else: - raise socket.error, (err, errorcode[err]) + raise socket.error(err, errorcode[err]) def accept(self): # XXX can return either an address pair or None @@ -333,9 +349,11 @@ except socket.error, why: if why[0] == EWOULDBLOCK: return 0 + elif why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): + self.handle_close() + return 0 else: raise - return 0 def recv(self, buffer_size): try: @@ -349,15 +367,21 @@ return data except socket.error, why: # winsock sometimes throws ENOTCONN - if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: + if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: self.handle_close() return '' else: raise def close(self): + self.connected = False + self.accepting = False self.del_channel() - self.socket.close() + try: + self.socket.close() + except socket.error, why: + if why[0] not in (ENOTCONN, EBADF): + raise # cheap inheritance, used to pass all other attribute # references to the underlying socket object. @@ -377,27 +401,53 @@ def handle_read_event(self): if self.accepting: - # for an accepting socket, getting a read implies - # that we are connected - if not self.connected: - self.connected = True + # accepting sockets are never connected, they "spawn" new + # sockets that are connected self.handle_accept() elif not self.connected: - self.handle_connect() - self.connected = True + self.handle_connect_event() self.handle_read() else: self.handle_read() + def handle_connect_event(self): + self.connected = True + self.handle_connect() + def handle_write_event(self): - # getting a write implies that we are connected + if self.accepting: + # Accepting sockets shouldn't get a write event. + # We will pretend it didn't happen. + return + if not self.connected: - self.handle_connect() - self.connected = True + #check for errors + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise socket.error(err, strerror(err)) + + self.handle_connect_event() self.handle_write() def handle_expt_event(self): - self.handle_expt() + # if the handle_expt is the same default worthless method, + # we'll not even bother calling it, we'll instead generate + # a useful error + x = True + try: + y1 = self.__class__.handle_expt.im_func + y2 = dispatcher.handle_expt.im_func + x = y1 is y2 + except AttributeError: + pass + + if x: + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + msg = _strerror(err) + + raise socket.error(err, msg) + else: + self.handle_expt() def handle_error(self): nil, t, v, tbinfo = compact_traceback() @@ -473,7 +523,8 @@ def compact_traceback(): t, v, tb = sys.exc_info() tbinfo = [] - assert tb # Must have a traceback + if not tb: # Must have a traceback + raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, @@ -489,11 +540,22 @@ info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None): +def close_all(map=None, ignore_all=False): if map is None: map = socket_map for x in map.values(): - x.socket.close() + try: + x.close() + except OSError, x: + if x[0] == EBADF: + pass + elif not ignore_all: + raise + except (ExitNow, KeyboardInterrupt, SystemExit): + raise + except: + if not ignore_all: + raise map.clear() # Asynchronous File I/O: @@ -513,11 +575,12 @@ import fcntl class file_wrapper: - # here we override just enough to make a file + # Here we override just enough to make a file # look like a socket for the purposes of asyncore. + # The passed fd is automatically os.dup()'d def __init__(self, fd): - self.fd = fd + self.fd = os.dup(fd) def recv(self, *args): return os.read(self.fd, *args) @@ -539,6 +602,10 @@ def __init__(self, fd, map=None): dispatcher.__init__(self, None, map) self.connected = True + try: + fd = fd.fileno() + except AttributeError: + pass self.set_file(fd) # set it to non-blocking mode flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revision 62704) +++ Lib/test/test_asyncore.py (working copy) @@ -27,6 +27,9 @@ def __init__(self): self.socket = dummysocket() + def close(self): + self.socket.close() + class exitingdummy: def __init__(self): pass