Index: Doc/library/asynchat.rst =================================================================== --- Doc/library/asynchat.rst (revision 59392) +++ Doc/library/asynchat.rst (working copy) @@ -30,11 +30,12 @@ The :class:`asyncore.dispatcher` methods can be used, although not all make sense in a message/response context. - Like :class:`asyncore.dispatcher`, :class:`async_chat` defines a set of - events that are generated by an analysis of socket conditions after a - :cfunc:`select` call. Once the polling loop has been started the - :class:`async_chat` object's methods are called by the event-processing - framework with no action on the part of the programmer. + Like :class:`asyncore.dispatcher`, :class:`async_chat` defines a + set of events that are generated by an analysis of socket + conditions after a :cfunc:`select` call. Once the polling loop has + been started the :class:`async_chat` object's methods are called by + the event-processing framework with no action on the part of the + programmer. Two class attributes can be modified, to improve performance, or possibly even to conserve memory. @@ -49,19 +50,30 @@ The asynchronous output buffer size (default ``4096``). - Unlike :class:`asyncore.dispatcher`, :class:`async_chat` allows you to - define a first-in-first-out queue (fifo) of *producers*. A producer need - have only one method, :meth:`more`, which should return data to be - transmitted on the channel. - The producer indicates exhaustion (*i.e.* that it contains no more data) by - having its :meth:`more` method return the empty string. At this point the - :class:`async_chat` object removes the producer from the fifo and starts - using the next producer, if any. When the producer fifo is empty the - :meth:`handle_write` method does nothing. You use the channel object's - :meth:`set_terminator` method to describe how to recognize the end of, or - an important breakpoint in, an incoming transmission from the remote - endpoint. + Unlike :class:`asyncore.dispatcher`, :class:`async_chat` allows you + to define a first-in-first-out queue (fifo) of data sources. Data + sources come in three types: First, they can by simple bytes or + string values, added to the fifo by the :meth:`push` and + :meth:`push_str` methods, respectively. Second, they can be + iterables such that [bytes(x) for x in iter(*iterable*)] is valid, + added to the fifo by :meth:`push_iterable`, Finally, they can be + *producers*, added to the fifo with :meth:`push_with_producer`. + Producers are supported for backwards compatibility, and are + internally adapted into iterators. A producer need have only one + method, :meth:`more`, which should return data to be transmitted on + the channel, or b'' when it is exhausted. + + When a data source is exhausted, the :class:`async_chat` object + removes it from the fifo and starts using the next source, if + any. When the fifo is empty the :meth:`handle_write` method does + nothing. + + + You use the channel object's :meth:`set_terminator` method to + describe how to recognize the end of, or an important breakpoint + in, an incoming transmission from the remote endpoint. + To build a functioning :class:`async_chat` subclass your input methods :meth:`collect_incoming_data` and :meth:`found_terminator` must handle the data that the channel receives asynchronously. The methods are described @@ -70,17 +82,25 @@ .. method:: async_chat.close_when_done() - Pushes a ``None`` on to the producer fifo. When this producer is popped off + Pushes a ``None`` on to the source fifo. When this is popped off the fifo it causes the channel to be closed. .. method:: async_chat.collect_incoming_data(data) - Called with *data* holding an arbitrary amount of received data. The - default method, which must be overridden, raises a - :exc:`NotImplementedError` exception. + Called with *data* holding an arbitrary amount of received data in + the form of a bytes object. The default method stores the data in + an internal buffer, accesible by :meth:`get_data`. +.. method:: async_chat.get_data() + + Retrieves the data buffered by the default + :meth:`collect_incoming_data` since the last call to get_data, or + since the creation of the async_chat object if get_data has not + previously been called. + + .. method:: async_chat.discard_buffers() In emergencies this method will discard any data held in the input and/or @@ -89,10 +109,10 @@ .. method:: async_chat.found_terminator() - Called when the incoming data stream matches the termination condition set - by :meth:`set_terminator`. The default method, which must be overridden, - raises a :exc:`NotImplementedError` exception. The buffered input data - should be available via an instance attribute. + Called when the incoming data stream matches the termination + condition set by :meth:`set_terminator` or + :meth:`set_terminator_str`. The default method, which must be + overridden, raises a :exc:`NotImplementedError` exception. .. method:: async_chat.get_terminator() @@ -119,26 +139,46 @@ .. method:: async_chat.handle_write() - Called when the application may write data to the channel. The default - method calls the :meth:`initiate_send` method, which in turn will call - :meth:`refill_buffer` to collect data from the producer fifo associated - with the channel. + Called when the application may write data to the channel. The + default method calls the :meth:`initiate_send` method, which in + turn retrieves data from the source fifo and transmsits it through + the socket. .. method:: async_chat.push(data) - Creates a :class:`simple_producer` object (*see below*) containing the data - and pushes it on to the channel's ``producer_fifo`` to ensure its - transmission. This is all you need to do to have the channel write the - data out to the network, although it is possible to use your own producers - in more complex schemes to implement encryption and chunking, for example. + Pushes the passed data on to the channel's source fifo to ensure + its transmission, where *data* can be interpreted as a bytes + object. This is all you need to do to have the channel write the + data out to the network. +.. method:: async_chat.push_str(data, encoding) + + Pushes the passed data on to the channel's source fifo to ensure + its transmission, where *data* can be encoded into a bytes object + via the specified encoding. This is all you need to do to have the + channel write the data out to the network. + + It is wise to be aware the of endian behavior of your chosen + encoding, especially when communicating between computers of + different architectures. + + +.. method:: async_chat.push_iterable(iterable) + + Pushes the passed iterable onto the channel's source fifo to ensure + its transmission, where *iterable* results in a series of bytes + objects. This is all you need to do to have the channel write the + data out to the network. + + .. method:: async_chat.push_with_producer(producer) - Takes a producer object and adds it to the producer fifo associated with - the channel. When all currently-pushed producers have been exhausted the - channel will consume this producer's data by calling its :meth:`more` + Takes a producer object, adapts it into an iterator, and adds it to + the source fifo associated with the channel. When all + currently-pushed sources have been exhausted the channel will + consume this producer's data by repeatedly calling its :meth:`more` method and send the data to the remote endpoint. @@ -148,14 +188,6 @@ channels tested by the :cfunc:`select` loop for readability. -.. method:: async_chat.refill_buffer() - - Refills the output buffer by calling the :meth:`more` method of the - producer at the head of the fifo. If it is exhausted then the producer is - popped off the fifo and the next producer is activated. If the current - producer is, or becomes, ``None`` then the channel is closed. - - .. method:: async_chat.set_terminator(term) Sets the terminating condition to be recognized on the channel. ``term`` @@ -165,11 +197,11 @@ +-----------+---------------------------------------------+ | term | Description | +===========+=============================================+ - | *string* | Will call :meth:`found_terminator` when the | - | | string is found in the input stream | + | *bytes* | Will call :meth:`found_terminator` when the | + | | byte sequence is found in the input stream | +-----------+---------------------------------------------+ | *integer* | Will call :meth:`found_terminator` when the | - | | indicated number of characters have been | + | | indicated number of bytes have been | | | received | +-----------+---------------------------------------------+ | ``None`` | The channel continues to collect data | @@ -180,6 +212,18 @@ by the channel after :meth:`found_terminator` is called. +.. method:: async_chat.set_terminator_str(term, encoding) + + Sets the terminating condition to be recognized on the channel. + ``term`` is encoded into a bytes object using the ``encoding``, + after which it is treated just as a bytes object passed to + :meth:`set_terminator`. + + It is wise to be aware the of endian behavior of your chosen + encoding, especially when communicating between computers of + different architectures. + + .. method:: async_chat.writable() Should return ``True`` as long as items remain on the producer fifo, or the @@ -203,36 +247,6 @@ empty string. -.. class:: fifo([list=None]) - - Each channel maintains a :class:`fifo` holding data which has been pushed - by the application but not yet popped for writing to the channel. A - :class:`fifo` is a list used to hold data and/or producers until they are - required. If the *list* argument is provided then it should contain - producers or data items to be written to the channel. - - -.. method:: fifo.is_empty() - - Returns ``True`` if and only if the fifo is empty. - - -.. method:: fifo.first() - - Returns the least-recently :meth:`push`\ ed item from the fifo. - - -.. method:: fifo.push(data) - - Adds the given data (which may be a string or a producer object) to the - producer fifo. - - -.. method:: fifo.pop() - - If the fifo is not empty, returns ``True, first()``, deleting the popped - item. Returns ``False, None`` for an empty fifo. - The :mod:`asynchat` module also defines one utility function, which may be of use in network and textual analysis operations. @@ -270,25 +284,18 @@ asynchat.async_chat.__init__(self, conn=conn) self.addr = addr self.sessions = sessions - self.ibuffer = [] - self.obuffer = "" - self.set_terminator("\r\n\r\n") + self.set_terminator(b"\r\n\r\n") self.reading_headers = True self.handling = False self.cgi_data = None self.log = log - def collect_incoming_data(self, data): - """Buffer the data""" - self.ibuffer.append(data) - def found_terminator(self): if self.reading_headers: self.reading_headers = False - self.parse_headers("".join(self.ibuffer)) - self.ibuffer = [] - if self.op.upper() == "POST": - clen = self.headers.getheader("content-length") + self.parse_headers(self.get_data()) + if self.op.upper() == b"POST": + clen = self.headers.getheader(b"content-length") self.set_terminator(int(clen)) else: self.handling = True @@ -296,7 +303,6 @@ self.handle_request() elif not self.handling: self.set_terminator(None) # browsers sometimes over-send - self.cgi_data = parse(self.headers, "".join(self.ibuffer)) + self.cgi_data = parse(self.headers, self.get_data()) self.handling = True - self.ibuffer = [] self.handle_request() Index: Doc/library/asyncore.rst =================================================================== --- Doc/library/asyncore.rst (revision 59392) +++ Doc/library/asyncore.rst (working copy) @@ -119,8 +119,10 @@ .. method:: dispatcher.handle_expt() - Called when there is out of band (OOB) data for a socket connection. This - will almost never happen, as OOB is tenuously supported and rarely used. + Called when there is out of band (OOB) data or other exceptional + conditions for a socket connection. This will almost never happen, + as OOB is tenuously supported and rarely used. The default behavior + is to raise a socket.error describing the condition. .. method:: dispatcher.handle_connect() @@ -143,9 +145,9 @@ .. method:: dispatcher.handle_accept() - Called on listening channels (passive openers) when a connection can be - established with a new remote endpoint that has issued a :meth:`connect` - call for the local endpoint. + Called on listening channels (passive openers) when a connection + can be established with a new remote endpoint that has issued a + :meth:`connect` call for the local endpoint. .. method:: dispatcher.readable() Index: Lib/asynchat.py =================================================================== --- Lib/asynchat.py (revision 59392) +++ Lib/asynchat.py (working copy) @@ -32,18 +32,17 @@ the common internet protocols - smtp, nntp, ftp, etc..). The handle_read() method looks at the input stream for the current -'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' -for multi-line output), calling self.found_terminator() on its -receipt. +'terminator', calling self.found_terminator() on its receipt. for example: -Say you build an async nntp client using this class. At the start -of the connection, you'll have self.terminator set to '\r\n', in -order to process the single-line greeting. Just before issuing a -'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST -command will be accumulated (using your own 'collect_incoming_data' -method) up to the terminator, and then control will be returned to -you - by calling your self.found_terminator() method. +Say you build an async nntp client using this class. At the start of +the connection, you'll have called self.set_terminator(b'\r\n'), in +order to process the single-line greeting. Just before issuing a LIST +command you'll set it to b'\r\n.\r\n'. The output of the LIST command +will be accumulated (using your own 'collect_incoming_data' method) up +to the terminator, and then control will be returned to you - by +calling your self.found_terminator() method. + """ import sys @@ -52,31 +51,155 @@ from collections import deque class async_chat (asyncore.dispatcher): - """This is an abstract class. You must derive from this class, and add - the two methods collect_incoming_data() and found_terminator()""" + """This is an abstract class. You must derive from this class, + and add the `found_terminator` method. Children may also override + `collect_incoming_data` to override the default buffering of + incoming data. + + The set_terminator(term) or set_terminator_str(term, encoding) + method almost always should be called before entering + asyncore.loop. + + In addition to overriding methods, children may override the + variables `ac_in_buffer_size` and `ac_out_buffer_size`, which + provide hints regarding the number of bytes that may be stored in + the incoming and outgoing buffers, respectively. Changing these + will not affect functionality, but may alter performance. The + default value of each is 4096. + + """ + # these are overridable defaults ac_in_buffer_size = 4096 ac_out_buffer_size = 4096 def __init__ (self, conn=None): - self.ac_in_buffer = b'' - self.ac_out_buffer = b'' - self.producer_fifo = fifo() + """Constructor, should be called by child's __init__ + + Optional arguments: + conn -- An already initialized socket. + + conn, if provided and not None, will be used as the socket for + this channel. If conn is provided, create_socket and its + related methods need not be called. + + """ + + # for terminator matching + self.__in_buffer = b'' + # we use a list here rather than cStringIO for a few reasons... + # del lst[:] is faster than sio.truncate(0) + # lst = [] is faster than sio.truncate(0) + # cStringIO will be gaining unicode support in py3k, which + # will negatively affect the performance of bytes compared to + # a ''.join() equivalent + self.__incoming = [] + + # we toss the use of the "simple producer" and replace it with + # a pure deque, which the original fifo was a wrapping of + self.__producer_fifo = deque() + asyncore.dispatcher.__init__ (self, conn) def collect_incoming_data(self, data): - raise NotImplementedError("must be implemented in subclass") + """Store received data for later processing + Arguments: + data -- A bytes object representing some received data. + + collect_incoming_data may be overridden in a child class. The + default implementation simply records incoming data, making it + accessible via the `get_data` method; `found_terminator` is + the proper place to do processing in most cases. + + """ + self.__incoming.append(data) + + def get_data(self): + """Retrieve the data received so far. Returns a bytes object. + + This method is tied to the default collect_incoming_data + implementation. It retrieves and empties the buffer that is + filled by the default collect_incoming_data. + + """ + d = b''.join(self.__incoming) + del self.__incoming[:] + return d + def found_terminator(self): - raise NotImplementedError("must be implemented in subclass") + """Process a complete section of data received from the socket + found_terminator must be overridden in a child class for + async_chat to be useful. It is called when the terminator is + found in the received data stream, after calling + `collect_incoming_data` with all bytes prior to the terminator + in the stream. The terminator is defined by the most recent + call to `set_terminator` or `set_terminator_str`. If + `collect_incoming_data` has not been overridden, get_data() + will return the data received up to the terminator. + + + """ + raise NotImplementedError("found_terminator must be implemented in subclass") + def set_terminator (self, term): - "Set the input delimiter. Can be a fixed string of any length, an integer, or None" - self.terminator = term + """Set the input delimiter. + Arguments: + term -- Can be a bytes object, an integer, or None. + + If term is None, async_chat will never call + found_terminator. This is occasionally useful, if + `collect_incoming_data` performs all necessary handling of the + data. + + If term is an int object, async_chat will read that many bytes + from the data stream, and then call `found_terminator`. This + is useful for reading protocol sections of known length. + + If term is neither an int nor None, it will be passed to the + bytes object constructor. async_chat will look for that + sequence of bytes in the incoming data stream, and call + `found_terminator` when it is discovered. This is useful for + reading protocol sections of varying or unknown length. + + set_terminator may be called as many times as you like. This + is quite often useful -- for example in HTTP you might read + the headers by doing set_terminator(b'\n\n') and then extract + the content length from the headers and do + set_terminator(content_length) to read the page data. + + """ + + if term is None: + self.terminator = None + elif isinstance(term, int): + self.terminator = term + else: + self.terminator = bytes(term) + + def set_terminator_str(self, term, encoding): + """Set the input delimiter to an encoded string + + Arguments: + term -- A str object. + encoding -- The name of a codec. + + This method is like `set_terminator`, except that it sets the + terminator to a sequence of bytes representing term in the + given encoding. + + It is very important to be aware of byte ordering when using + this function. Some encodings produce endian-sentive results. + + """ + self.set_terminator(term.encode(encoding)) + def get_terminator (self): + """Return the current terminator""" return self.terminator # grab some more data from the socket, @@ -85,39 +208,39 @@ # if found, transition to the next state. def handle_read (self): + """asyncore function. Overriding children must invoke.""" try: - data = self.recv (self.ac_in_buffer_size) - except socket.error as why: + data = self.recv(self.ac_in_buffer_size) + except socket.error: self.handle_error() return - if isinstance(data, str): - data = data.encode('ascii') - self.ac_in_buffer = self.ac_in_buffer + bytes(data) + data = bytes(data) + self.__in_buffer = self.__in_buffer + data - # Continue to search for self.terminator in self.ac_in_buffer, + # Continue to search for self.terminator in self.__in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator - # combos with a single recv(1024). + # combos with a single recv(4096). - while self.ac_in_buffer: - lb = len(self.ac_in_buffer) + while self.__in_buffer: + lb = len(self.__in_buffer) terminator = self.get_terminator() if not terminator: # no terminator, collect it all - self.collect_incoming_data (self.ac_in_buffer) - self.ac_in_buffer = b'' - elif isinstance(terminator, int) or isinstance(terminator, int): + self.collect_incoming_data (self.__in_buffer) + self.__in_buffer = b'' + elif isinstance(terminator, int): # numeric terminator n = terminator if lb < n: - self.collect_incoming_data (self.ac_in_buffer) - self.ac_in_buffer = b'' + self.collect_incoming_data (self.__in_buffer) + self.__in_buffer = b'' self.terminator = self.terminator - lb else: - self.collect_incoming_data (self.ac_in_buffer[:n]) - self.ac_in_buffer = self.ac_in_buffer[n:] + self.collect_incoming_data (self.__in_buffer[:n]) + self.__in_buffer = self.__in_buffer[n:] self.terminator = 0 self.found_terminator() else: @@ -129,122 +252,206 @@ # 3) end of buffer does not match any prefix: # collect data terminator_len = len(terminator) - if isinstance(terminator, str): - terminator = terminator.encode('ascii') - index = self.ac_in_buffer.find(terminator) + index = self.__in_buffer.find(terminator) if index != -1: # we found the terminator if index > 0: # don't bother reporting the empty string (source of subtle bugs) - self.collect_incoming_data (self.ac_in_buffer[:index]) - self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] + self.collect_incoming_data (self.__in_buffer[:index]) + self.__in_buffer = self.__in_buffer[index+terminator_len:] # This does the Right Thing if the terminator is changed here. self.found_terminator() else: # check for a prefix of the terminator - index = find_prefix_at_end (self.ac_in_buffer, terminator) + index = find_prefix_at_end (self.__in_buffer, terminator) if index: if index != lb: # we found a prefix, collect up to the prefix - self.collect_incoming_data (self.ac_in_buffer[:-index]) - self.ac_in_buffer = self.ac_in_buffer[-index:] + self.collect_incoming_data (self.__in_buffer[:-index]) + self.__in_buffer = self.__in_buffer[-index:] break else: # no prefix, collect it all - self.collect_incoming_data (self.ac_in_buffer) - self.ac_in_buffer = b'' + self.collect_incoming_data (self.__in_buffer) + self.__in_buffer = b'' def handle_write (self): - self.initiate_send () + """asyncore function. Overriding children must invoke.""" + self.initiate_send() def handle_close (self): + """asyncore function. Overriding children must invoke.""" self.close() def push (self, data): - self.producer_fifo.push (simple_producer (data)) + """Place data in the outgoing queue. + + Arguments: + data -- The expression bytes(data) must be meaningful + + Appends bytes(data) to the sequence of bytes to be sent + through the socket. Each time asyncore.loop discovers that + the socket is writable, some or all of the bytes in that queue + will be transmitted. + + """ + sabs = self.ac_out_buffer_size + data = bytes(data) + if len(data) > sabs: + for i in xrange(0, len(data), sabs): + self.__producer_fifo.append(data[i:i+sabs]) + else: + self.__producer_fifo.append(data) self.initiate_send() - def push_with_producer (self, producer): - self.producer_fifo.push (producer) + def push_str(self, data, encoding): + """Place a string in the outgoing queue. + + Arguments: + data -- The expression data.encode(encoding) must be meaningful + + Appends bytes(data.encode(encoding)) to the sequence of bytes + to be sent through the socket. Each time asyncore.loop + discovers that the socket is writable, some or all of the + bytes in that queue will be transmitted. + + It is very important to be aware of byte ordering when using + this function. Some encodings produce endian-sentive results. + + """ + self.push(data.encode(encoding)) + + def push_iterable(self, iterable): + """Place the iterated items in the outgoing queue + + Arguments: + iterable -- [bytes(x) for x in iterable] must be meaningful + + Appends bytes(x) for each x in iterator to the sequence of + bytes to be sent through the socket. Each time asyncore.loop + discovers that the socket is writable, some or all of the + bytes in that queue will be transmitted. + + """ + self.__producer_fifo.append(iter(iterable)) self.initiate_send() + def push_with_producer (self, producer): + """Place a producer in the outgoing queue. + + Arguments: + producer -- The expression producer.more() must be meaningful + + Repeatedly appends bytes(producer.more()) to the sequence of + bytes to be sent through the socket, until + bool(producer.more()) is False. Each time asyncore.loop + discovers that the socket is writable, some or all of the + bytes in that queue will be transmitted. + + """ + def adapter(prod): + while True: + data = prod.more() + if data == b'': + raise StopIteration + yield data + + self.push_iterable(adapter(producer)) + def readable (self): "predicate for inclusion in the readable for select()" - return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + # cannot use the old predicate, it violates the claim of the + # set_terminator method. + # return (len(self.__in_buffer) <= self.ac_in_buffer_size) + return 1 + def writable (self): "predicate for inclusion in the writable for select()" - # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) - # this is about twice as fast, though not as clear. - return not ( - (self.ac_out_buffer == b'') and - self.producer_fifo.is_empty() and - self.connected - ) + return self.__producer_fifo or (not self.connected) def close_when_done (self): "automatically close this channel once the outgoing queue is empty" - self.producer_fifo.push (None) + self.__producer_fifo.append(None) - # refill the outgoing buffer by calling the more() method - # of the first producer in the queue - def refill_buffer (self): - while 1: - if len(self.producer_fifo): - p = self.producer_fifo.first() - # a 'None' in the producer fifo is a sentinel, - # telling us to close the channel. - if p is None: - if not self.ac_out_buffer: - self.producer_fifo.pop() - self.close() - return - elif isinstance(p, str) or isinstance(p, bytes): - if isinstance(p, str): - p = p.encode('ascii') - self.producer_fifo.pop() - self.ac_out_buffer = self.ac_out_buffer + p - return - data = p.more() - if data: - if isinstance(data, str): - data = data.encode('ascii') - self.ac_out_buffer = self.ac_out_buffer + bytes(data) - return - else: - self.producer_fifo.pop() - else: + def initiate_send (self): + """Internal function. Do not invoke or override.""" + while self.__producer_fifo and self.connected: + source = self.__producer_fifo.popleft() + + # Handle None entry. + if source is None: + self.handle_close() return - def initiate_send (self): - obs = self.ac_out_buffer_size - # try to refill the buffer - if (len (self.ac_out_buffer) < obs): - self.refill_buffer() + # Handle empty entries. + if not source: + continue - if self.ac_out_buffer and self.connected: - # try to send the buffer try: - num_sent = self.send (self.ac_out_buffer[:obs]) - if num_sent: - self.ac_out_buffer = self.ac_out_buffer[num_sent:] + data = source.__next__() + self.__producer_fifo.appendleft(source) + except StopIteration: + continue + except AttributeError: + data = source - except socket.error as why: + data = bytes(data) + + # Send the data. + try: + num_sent = self.send(data[:self.ac_out_buffer_size]) + except socket.error: self.handle_error() return + if num_sent: + if num_sent < len(data): + self.__producer_fifo.appendleft(data[num_sent:]) + + # We tried to send some actual data. + return + def discard_buffers (self): + """Discard all incoming and outgoing data. + + It is generally wise to avoid the use of this function, as it + results in data loss. This is rarely useful. + + """ # Emergencies only! - self.ac_in_buffer = b'' - self.ac_out_buffer = b'' - while self.producer_fifo: - self.producer_fifo.pop() + self.__in_buffer = b'' + del self.__incoming[:] + self.__producer_fifo.clear() - class simple_producer: + """Produce bytes for transmission. + + This is a very basic producer; the only extra functionality it + provides over async_chat.push(bytes) is to break the data up into + blocks of arbitrary size, such that each call to more produces a + block of data no larger than the requested size. Even that is not + very useful, since setting async_chat.ac_out_buffer_size achieves + the same thing. + + """ + def __init__ (self, data, buffer_size=512): - self.data = data + """Initialize the producer with a byte sequence + + Arguments: + data -- bytes(data) must be meaningful. + + Optional arguments: + buffer_size -- A number, defaults to 512 + + The simple_producer will pop and return blocks of `data` of + length min(`buffer_size`, len(`data`)) until data is empty. + + """ + + self.data = bytes(data) self.buffer_size = buffer_size def more (self): @@ -257,31 +464,6 @@ self.data = b'' return result -class fifo: - def __init__ (self, list=None): - if not list: - self.list = deque() - else: - self.list = deque(list) - - def __len__ (self): - return len(self.list) - - def is_empty (self): - return not self.list - - def first (self): - return self.list[0] - - def push (self, data): - self.list.append(data) - - def pop (self): - if self.list: - return (1, self.list.popleft()) - else: - return (0, None) - # Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. @@ -298,6 +480,17 @@ # regex: 14035/s def find_prefix_at_end (haystack, needle): + """Returns > 0 if the beginning of needle is at the end of haystack + + Arguments: + haystack -- A bytes object. + needle -- A bytes object. + + Given haystack, see if any prefix of needle is at its end. This + assumes an exact match has already been checked. Return the + number of characters matched. + + """ l = len(needle) - 1 while l and not haystack.endswith(needle[:l]): l -= 1 Index: Lib/smtpd.py =================================================================== --- Lib/smtpd.py (revision 59392) +++ Lib/smtpd.py (working copy) @@ -123,12 +123,12 @@ self.__fqdn = socket.getfqdn() self.__peer = conn.getpeername() print('Peer:', repr(self.__peer), file=DEBUGSTREAM) - self.push('220 %s %s' % (self.__fqdn, __version__)) - self.set_terminator('\r\n') + self.push_str('220 %s %s' % (self.__fqdn, __version__), 'ascii') + self.set_terminator(b'\r\n') # Overrides base class for convenience def push(self, msg): - asynchat.async_chat.push(self, msg + '\r\n') + asynchat.async_chat.push(self, msg + b'\r\n') # Implementation of base class abstract method def collect_incoming_data(self, data): @@ -141,7 +141,7 @@ self.__line = [] if self.__state == self.COMMAND: if not line: - self.push('500 Error: bad syntax') + self.push_str('500 Error: bad syntax', 'ascii') return method = None i = line.find(' ') @@ -153,13 +153,13 @@ arg = line[i+1:].strip() method = getattr(self, 'smtp_' + command, None) if not method: - self.push('502 Error: command "%s" not implemented' % command) + self.push_str('502 Error: command "%s" not implemented' % command, 'ascii') return method(arg) return else: if self.__state != self.DATA: - self.push('451 Internal confusion') + self.push_str('451 Internal confusion', 'ascii') return # Remove extraneous carriage returns and de-transparency according # to RFC 821, Section 4.5.2. @@ -177,32 +177,32 @@ self.__rcpttos = [] self.__mailfrom = None self.__state = self.COMMAND - self.set_terminator('\r\n') + self.set_terminator(b'\r\n') if not status: - self.push('250 Ok') + self.push_str('250 Ok', 'ascii') else: - self.push(status) + self.push_str(status, 'ascii') # SMTP and ESMTP commands def smtp_HELO(self, arg): if not arg: - self.push('501 Syntax: HELO hostname') + self.push_str('501 Syntax: HELO hostname', 'ascii') return if self.__greeting: - self.push('503 Duplicate HELO/EHLO') + self.push_str('503 Duplicate HELO/EHLO', 'ascii') else: self.__greeting = arg - self.push('250 %s' % self.__fqdn) + self.push_str('250 %s' % self.__fqdn, 'ascii') def smtp_NOOP(self, arg): if arg: - self.push('501 Syntax: NOOP') + self.push_str('501 Syntax: NOOP', 'ascii') else: - self.push('250 Ok') + self.push_str('250 Ok', 'ascii') def smtp_QUIT(self, arg): # args is ignored - self.push('221 Bye') + self.push_str('221 Bye', 'ascii') self.close_when_done() # factored @@ -223,49 +223,49 @@ print('===> MAIL', arg, file=DEBUGSTREAM) address = self.__getaddr('FROM:', arg) if arg else None if not address: - self.push('501 Syntax: MAIL FROM:
') + self.push_str('501 Syntax: MAIL FROM:
', 'ascii') return if self.__mailfrom: - self.push('503 Error: nested MAIL command') + self.push_str('503 Error: nested MAIL command', 'ascii') return self.__mailfrom = address print('sender:', self.__mailfrom, file=DEBUGSTREAM) - self.push('250 Ok') + self.push_str('250 Ok', 'ascii') def smtp_RCPT(self, arg): print('===> RCPT', arg, file=DEBUGSTREAM) if not self.__mailfrom: - self.push('503 Error: need MAIL command') + self.push_str('503 Error: need MAIL command', 'ascii') return address = self.__getaddr('TO:', arg) if arg else None if not address: - self.push('501 Syntax: RCPT TO:
') + self.push_str('501 Syntax: RCPT TO:
', 'ascii') return self.__rcpttos.append(address) print('recips:', self.__rcpttos, file=DEBUGSTREAM) - self.push('250 Ok') + self.push_str('250 Ok', 'ascii') def smtp_RSET(self, arg): if arg: - self.push('501 Syntax: RSET') + self.push_str('501 Syntax: RSET', 'ascii') return # Resets the sender, recipients, and data, but not the greeting self.__mailfrom = None self.__rcpttos = [] self.__data = '' self.__state = self.COMMAND - self.push('250 Ok') + self.push_str('250 Ok', 'ascii') def smtp_DATA(self, arg): if not self.__rcpttos: - self.push('503 Error: need RCPT command') + self.push_str('503 Error: need RCPT command', 'ascii') return if arg: - self.push('501 Syntax: DATA') + self.push_str('501 Syntax: DATA', 'ascii') return self.__state = self.DATA - self.set_terminator('\r\n.\r\n') - self.push('354 End data with .') + self.set_terminator(b'\r\n.\r\n') + self.push_str('354 End data with .', 'ascii') Index: Lib/asyncore.py =================================================================== --- Lib/asyncore.py (revision 59392) +++ Lib/asyncore.py (working copy) @@ -52,21 +52,28 @@ import time import os -from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ - ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode +from errno import (EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, + ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EISCONN, + EBADF, ECONNABORTED, errorcode) try: socket_map except NameError: socket_map = {} +def _strerror(err): + res = os.strerror(err) + if res == 'Unknown error': + res = errorcode[err] + return res + class ExitNow(Exception): pass def read(obj): try: obj.handle_read_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -74,15 +81,15 @@ def write(obj): try: obj.handle_write_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() -def _exception (obj): +def _exception(obj): try: obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -95,7 +102,7 @@ obj.handle_write_event() if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -116,15 +123,15 @@ e.append(fd) if [] == r == w == e: time.sleep(timeout) - else: - try: - r, w, e = select.select(r, w, e, timeout) - except select.error as err: - if err.args[0] != EINTR: - raise - else: - return + try: + r, w, e = select.select(r, w, e, timeout) + except select.error as err: + if err.args[0] != EINTR: + raise + else: + return + for fd in r: obj = map.get(fd) if obj is None: @@ -178,6 +185,23 @@ poll3 = poll2 # Alias for backward compatibility def loop(timeout=30.0, use_poll=False, map=None, count=None): + """The main event-handling loop of asyncore. + + Optional arguments: + timeout -- Number representing seconds, default 30.0 + use_poll -- Use poll instead of select if available, default False + map -- The map of channels to process, default is use built-in map + count -- number of times to cycle through the loop, default infinity + + Note: Setting timeout will not cause loop to terminate after a + fixed amount of time. Rather, it determines how long each select + or poll call will block. To cause loop to terminate after a fixed + amount of time, pass an integer count. The loop will then + terminate after approximately count * timeout seconds, if the + select or poll calls block. + + """ + if map is None: map = socket_map @@ -197,6 +221,28 @@ class dispatcher: + """The abstract base class that users of asyncore should inherit + from. + + A dispatcher object handles a single socket, processing connect, + accept, close, read and write events as defined by the child + handle_connect, handle_accept, handle_close, handle_read and + handle_write methods, respectively. + + The child may also define handle_expt to handle exceptional + conditions during the communication process. Exceptional + conditions are ill-defined, and may or may not include out of band + data, hangup, socket error, and invalid data. + + dispatcher implements many socket methods, so children can send, + recv, accept, bind, listen, etc within as if self were a socket. + dispatcher also has a create_socket method for creating the + internal socket, if necessary. + + Children should always invoke dispatcher.__init__. + + """ + debug = False connected = False accepting = False @@ -204,23 +250,50 @@ addr = None def __init__(self, sock=None, map=None): + """Initialize a dispatcher. + + Children should always invoke dispatcher.__init__ + + Optional arguments: + sock -- A socket to communicate through, default None + map -- A map to register in, default is use built-in map + + If sock is None, create_socket and either connect or + listen/bind must be called to create and initialize the + socket. + + If map is None, the asyncore global map is used. + + """ + if map is None: self._map = socket_map else: self._map = map + self._fileno = None + if sock: + # Set to nonblocking just to make sure for cases where we + # get a socket from a blocking source. + sock.setblocking(0) self.set_socket(sock, map) - # I think it should inherit this anyway - self.socket.setblocking(0) self.connected = True - # XXX Does the constructor require that the socket passed - # be connected? + # The constructor no longer requires that the socket + # passed be connected. try: self.addr = sock.getpeername() except socket.error: - # The addr isn't crucial - pass + if err[0] == ENOTCONN: + # To handle the case where we got an unconnected + # socket. + self.connected = False + else: + # The socket is broken in some unknown way, alert + # the user and remove it from the map (to prevent + # polling of broken sockets). + self.del_channel(map) + raise else: self.socket = None @@ -238,12 +311,24 @@ return '<%s at %#x>' % (' '.join(status), id(self)) def add_channel(self, map=None): + """Adds the dispatcher to the map. + + Called automatically. + + """ + #self.log_info('adding channel %s' % self) if map is None: map = self._map map[self._fileno] = self def del_channel(self, map=None): + """Removes the dispatcher from the map. + + Called automatically by close. + + """ + fd = self._fileno if map is None: map = self._map @@ -253,19 +338,47 @@ self._fileno = None def create_socket(self, family, type): + """Creates and initializes the internal socket. + + Arguments: + family -- socket module AF_* constant + type -- socket module SOCK_* constant + + If an initialized socket is passed to the constructor's conn + argument, or to the set_socket method, there is no need to + call this method. + + """ + self.family_and_type = family, type - self.socket = socket.socket(family, type) - self.socket.setblocking(0) - self._fileno = self.socket.fileno() - self.add_channel() + sock = socket.socket(family, type) + sock.setblocking(0) + self.set_socket(sock) def set_socket(self, sock, map=None): + """Sets the internal socket + + Arguments: + sock -- An initialized socket + + Optional arguments: + map -- The map to use, default is the built-in map + + """ + self.socket = sock ## self.__dict__['socket'] = sock self._fileno = sock.fileno() self.add_channel(map) def set_reuse_addr(self): + """Sets the socket.SO_REUSEADDR socket option, if possible + + This will cause the socket to attempt to re-use a server + port. There is no guarantee of success. + + """ + # try to re-use a server port if possible try: self.socket.setsockopt( @@ -283,9 +396,31 @@ # ================================================== def readable(self): + """Determine whether read events should be generated for this + dispatcher. + + While readable returns False, read events will not be + generated for this dispatcher, and handle_read and + handle_accept will not be called. handle_connect will be + called only if triggered by a write event. + + Default is to return True. + + """ return True def writable(self): + """Determine whether write events should be generated for this + dispatcher. + + While writable returns False, write events will not be + generated for this dispatcher, and handle_write not be called. + handle_connect will be called only if triggered by a read + event. + + Default is to return True. + + """ return True # ================================================== @@ -293,16 +428,19 @@ # ================================================== def listen(self, num): + """Behaves as the similarly-named socket method""" self.accepting = True if os.name == 'nt' and num > 5: - num = 1 + num = 5 return self.socket.listen(num) def bind(self, addr): + """Behaves as the similarly-named socket method""" self.addr = addr return self.socket.bind(addr) def connect(self, address): + """Behaves as the similarly-named socket method""" self.connected = False err = self.socket.connect_ex(address) # XXX Should interpret Winsock return values @@ -310,12 +448,12 @@ return if err in (0, EISCONN): self.addr = address - self.connected = True - self.handle_connect() + self.handle_connect_event() else: raise socket.error(err, errorcode[err]) def accept(self): + """Behaves as the similarly-named socket method""" # XXX can return either an address pair or None try: conn, addr = self.socket.accept() @@ -327,17 +465,21 @@ raise def send(self, data): + """Behaves as the similarly-named socket method""" try: result = self.socket.send(data) return result except socket.error as why: if why[0] == EWOULDBLOCK: return 0 + elif why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): + self.handle_close() + return 0 else: raise - return 0 def recv(self, buffer_size): + """Behaves as the similarly-named socket method""" try: data = self.socket.recv(buffer_size) if not data: @@ -349,15 +491,22 @@ return data except socket.error as why: # winsock sometimes throws ENOTCONN - if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: + if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: self.handle_close() return b'' else: raise def close(self): + """Behaves as the similarly-named socket method""" + self.connected = False + self.accepting = False self.del_channel() - self.socket.close() + try: + self.socket.close() + except socket.error as why: + if why[0] not in (ENOTCONN, EBADF): + raise # cheap inheritance, used to pass all other attribute # references to the underlying socket object. @@ -376,30 +525,50 @@ print('%s: %s' % (type, message)) def handle_read_event(self): + """Internal method, do not override.""" if self.accepting: - # for an accepting socket, getting a read implies - # that we are connected - if not self.connected: - self.connected = True + # accepting sockets are never connected, they "spawn" new + # sockets that are connected self.handle_accept() elif not self.connected: - self.handle_connect() - self.connected = True + self.handle_connect_event() self.handle_read() else: self.handle_read() + def handle_connect_event(self): + """Internal method, do not override.""" + self.connected = True + self.handle_connect() + def handle_write_event(self): - # getting a write implies that we are connected + """Internal method, do not override.""" + if self.accepting: + # Accepting sockets shouldn't get a write event. + # We will pretend it didn't happen. + return + if not self.connected: - self.handle_connect() - self.connected = True + #check for errors + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise socket.error(err, strerror(err)) + + self.handle_connect_event() self.handle_write() def handle_expt_event(self): + """Internal method, do not override.""" self.handle_expt() def handle_error(self): + """Called when an exception is raised and not + otherwise handled. + + The default version prints a condensed traceback. + + """ + nil, t, v, tbinfo = compact_traceback() # sometimes a user repr method will crash. @@ -420,21 +589,71 @@ self.close() def handle_expt(self): - self.log_info('unhandled exception', 'warning') + """Called to handle an exceptional condition event on the + socket. + Children may override this method to implement exceptional + condition processing. The default behavior is to raise an + exception. + + """ + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + msg = _strerror(err) + + raise socket.error(err, msg) + def handle_read(self): + """Called to handle a read event. + + When this method is called, a call to self.recv will return + data without blocking. Children may override this method to + implement data reception. + + """ self.log_info('unhandled read event', 'warning') def handle_write(self): + """Called to handle a write event. + + When this method is called, a call to self.send will transmit + data without blocking. Children may override this method to + implement data transmission. + + """ self.log_info('unhandled write event', 'warning') def handle_connect(self): + """Called to handle a connect event. + + When this method is called, the socket has just been + connected. Calls to self.send and self.recv will succeed + without blocking. Children may override this method. + + """ self.log_info('unhandled connect event', 'warning') def handle_accept(self): + """Called to handle an accept event. + + When this method is called, a call to self.accept will return + a new socket without blocking. Children may override this + method to implement server socket listening. + + Note: This event will not be triggered unless listen and bind + have previously been called. Passing a listening socket into + the constructor or set_socket will not work. + + """ self.log_info('unhandled accept event', 'warning') def handle_close(self): + """Called to handle a close event. + + When this method is called, it is time to call self.close. + Children may override this method to implement end-of-life + behavior. + + """ self.log_info('unhandled close event', 'warning') self.close() @@ -444,7 +663,20 @@ # --------------------------------------------------------------------------- class dispatcher_with_send(dispatcher): + """A very basic buffered output implementation on top of + dispatcher. + The send method of this class queues data that are passed to it, + and the handle_write method transmits data from the queue across + the network. Thus you can simply call send with however much data + you like, and dispatcher_with_send will see to it that all of the + data are sent. + + The asynchat module provides a more capable implementation of + this, along with assorted other features. + + """ + def __init__(self, sock=None, map=None): dispatcher.__init__(self, sock, map) self.out_buffer = b'' @@ -474,7 +706,8 @@ def compact_traceback(): t, v, tb = sys.exc_info() tbinfo = [] - assert tb # Must have a traceback + if not tb: # Must have a traceback + raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, @@ -490,11 +723,23 @@ info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None): +def close_all(map=None, ignore_all=False): if map is None: map = socket_map - for x in map.values(): - x.socket.close() + for x in list(map.values()): + try: + x.close() + except OSError as x: + if x[0] == EBADF: + pass + elif not ignore_all: + raise + except (ExitNow, KeyboardInterrupt, SystemExit): + raise + except: + if not ignore_all: + raise + map.clear() # Asynchronous File I/O: @@ -514,11 +759,12 @@ import fcntl class file_wrapper: - # here we override just enough to make a file + # Here we override just enough to make a file # look like a socket for the purposes of asyncore. + # The passed fd is automatically os.dup()'d def __init__(self, fd): - self.fd = fd + self.fd = os.dup(fd) def recv(self, *args): return os.read(self.fd, *args) @@ -540,6 +786,10 @@ def __init__(self, fd, map=None): dispatcher.__init__(self, None, map) self.connected = True + try: + fd = fd.fileno() + except AttributeError: + pass self.set_file(fd) # set it to non-blocking mode flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) Index: Lib/test/test_smtplib.py =================================================================== --- Lib/test/test_smtplib.py (revision 59392) +++ Lib/test/test_smtplib.py (working copy) @@ -300,12 +300,12 @@ # Simulated SMTP channel & server class SimSMTPChannel(smtpd.SMTPChannel): def smtp_EHLO(self, arg): - resp = '250-testhost\r\n' \ - '250-EXPN\r\n' \ - '250-SIZE 20000000\r\n' \ - '250-STARTTLS\r\n' \ - '250-DELIVERBY\r\n' \ - '250 HELP' + resp = b'250-testhost\r\n' \ + b'250-EXPN\r\n' \ + b'250-SIZE 20000000\r\n' \ + b'250-STARTTLS\r\n' \ + b'250-DELIVERBY\r\n' \ + b'250 HELP' self.push(resp) def smtp_VRFY(self, arg): @@ -314,9 +314,9 @@ raw_addr = email.utils.parseaddr(arg)[1] quoted_addr = smtplib.quoteaddr(arg) if raw_addr in sim_users: - self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr)) + self.push_str('250 %s %s' % (sim_users[raw_addr], quoted_addr), 'ascii') else: - self.push('550 No such user: %s' % arg) + self.push_str('550 No such user: %s' % arg, 'ascii') def smtp_EXPN(self, arg): # print '\nsmtp_EXPN(%r)\n' % arg @@ -327,11 +327,11 @@ for n, user_email in enumerate(user_list): quoted_addr = smtplib.quoteaddr(user_email) if n < len(user_list) - 1: - self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) + self.push_str('250-%s %s' % (sim_users[user_email], quoted_addr), 'ascii') else: - self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) + self.push_str('250 %s %s' % (sim_users[user_email], quoted_addr), 'ascii') else: - self.push('550 No access for you!') + self.push_str('550 No access for you!', 'ascii') class SimSMTPServer(smtpd.SMTPServer): Index: Lib/test/test_asyncore.py =================================================================== --- Lib/test/test_asyncore.py (revision 59392) +++ Lib/test/test_asyncore.py (working copy) @@ -6,7 +6,10 @@ import threading import sys import time +import errno +from tempfile import TemporaryFile + from test import test_support from test.test_support import TESTFN, run_unittest, unlink from io import StringIO, BytesIO @@ -28,6 +31,9 @@ def __init__(self): self.socket = dummysocket() + def close(self): + self.socket.close() + class exitingdummy: def __init__(self): pass @@ -303,7 +309,7 @@ stdout = sys.stdout try: sys.stdout = fp - d.handle_expt() + #d.handle_expt() # Exceptional conditions now raise exceptions d.handle_read() d.handle_write() d.handle_connect() @@ -312,8 +318,7 @@ sys.stdout = stdout lines = fp.getvalue().splitlines() - expected = ['warning: unhandled exception', - 'warning: unhandled read event', + expected = ['warning: unhandled read event', 'warning: unhandled write event', 'warning: unhandled connect event', 'warning: unhandled accept event'] @@ -390,8 +395,8 @@ fd = os.open(TESTFN, os.O_RDONLY) w = asyncore.file_wrapper(fd) - self.assertEqual(w.fd, fd) - self.assertEqual(w.fileno(), fd) + self.assertNotEqual(w.fd, fd) + self.assertEqual(w.fileno(), w.fd) self.assertEqual(w.recv(13), b"It's not dead") self.assertEqual(w.read(6), b", it's") w.close() Index: Lib/test/test_asynchat.py =================================================================== --- Lib/test/test_asynchat.py (revision 59392) +++ Lib/test/test_asynchat.py (working copy) @@ -49,12 +49,11 @@ class echo_client(asynchat.async_chat): - def __init__(self, terminator): + def __init__(self): asynchat.async_chat.__init__(self) self.contents = [] self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect((HOST, PORT)) - self.set_terminator(terminator) self.buffer = b"" def handle_connect(self): @@ -88,41 +87,74 @@ s.chunk_size = server_chunk s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(term) + c = echo_client() + c.set_terminator(term) c.push(b"hello ") - c.push(bytes("world%s" % term, "ascii")) - c.push(bytes("I'm not dead yet!%s" % term, "ascii")) + c.push(b"world" + term) + c.push(b"I'm not dead yet!" + term) c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + def line_terminator_check_str(self, term, server_chunk): + s = echo_server() + s.chunk_size = server_chunk + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client() + c.set_terminator_str(term, 'utf-8') + c.push_str("\u03C8hello ", 'utf-8') + c.push_str("world%s" % term, "utf-8") + c.push_str("I'm not dead yet!%s" % term, "utf-8") + c.push(SERVER_QUIT) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, ["\u03C8hello world".encode('utf-8'), "I'm not dead yet!".encode('utf-8')]) + # the line terminator tests below check receiving variously-sized # chunks back from the server in order to exercise all branches of # async_chat.handle_read def test_line_terminator1(self): + # test one-byte terminator + for l in (1,2,3): + self.line_terminator_check(b'\n', l) + + def test_line_terminator2(self): + # test two-byte terminator + for l in (1,2,3): + self.line_terminator_check(b'\r\n', l) + + def test_line_terminator3(self): + # test three-byte terminator + for l in (1,2,3): + self.line_terminator_check(b'qqq', l) + + def test_line_terminator1_str(self): # test one-character terminator for l in (1,2,3): - self.line_terminator_check('\n', l) + self.line_terminator_check_str('\u03C9', l) - def test_line_terminator2(self): + def test_line_terminator2_str(self): # test two-character terminator for l in (1,2,3): - self.line_terminator_check('\r\n', l) + self.line_terminator_check_str('\u03C9\u03C9', l) - def test_line_terminator3(self): + def test_line_terminator3_str(self): # test three-character terminator for l in (1,2,3): - self.line_terminator_check('qqq', l) + self.line_terminator_check_str('\u03C9\u03C9\u03C9', l) def numeric_terminator_check(self, termlen): # Try reading a fixed number of bytes s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(termlen) + c = echo_client() + c.set_terminator(termlen) data = b"hello world, I'm not dead yet!\n" c.push(data) c.push(SERVER_QUIT) @@ -144,7 +176,8 @@ s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(None) + c = echo_client() + c.set_terminator(None) data = b"hello world, I'm not dead yet!\n" c.push(data) c.push(SERVER_QUIT) @@ -154,11 +187,12 @@ self.assertEqual(c.contents, []) self.assertEqual(c.buffer, data) - def test_simple_producer(self): + def test_push_with_producer(self): s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(b'\n') + c = echo_client() + c.set_terminator(b'\n') data = b"hello world\nI'm not dead yet!\n" p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) c.push_with_producer(p) @@ -167,13 +201,53 @@ self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) - def test_string_producer(self): + def test_push_iterable(self): s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(b'\n') + c = echo_client() + c.set_terminator(b'\n') + def producer(): + yield b'hello world\n' + yield b"I'm not dead yet!\n" + yield SERVER_QUIT + c.push_iterable(producer()) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"]) + + def test_push_two_iterables(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client() + c.set_terminator(b'\n') + def producer1(): + yield b'hello world\n' + yield b"I'm not dead yet!\n" + def producer2(): + yield b"I don't want to go on the cart.\n" + yield b"I feel happy!\n" + yield SERVER_QUIT + c.push_iterable(producer1()) + c.push_iterable(producer2()) + asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) + s.join() + + self.assertEqual(c.contents, [b"hello world", + b"I'm not dead yet!", + b"I don't want to go on the cart.", + b"I feel happy!"]) + + def test_push(self): + s = echo_server() + s.start() + time.sleep(0.5) # Give server time to initialize + c = echo_client() + c.set_terminator(b'\n') data = b"hello world\nI'm not dead yet!\n" - c.push_with_producer(data+SERVER_QUIT) + c.push(data+SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() @@ -184,7 +258,8 @@ s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(b'\n') + c = echo_client() + c.set_terminator(b'\n') c.push(b"hello world\n\nI'm not dead yet!\n") c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) @@ -197,7 +272,8 @@ s = echo_server() s.start() time.sleep(0.5) # Give server time to initialize - c = echo_client(b'\n') + c = echo_client() + c.set_terminator(b'\n') c.push(b"hello world\nI'm not dead yet!\n") c.push(SERVER_QUIT) c.close_when_done() @@ -216,37 +292,12 @@ class TestHelperFunctions(unittest.TestCase): def test_find_prefix_at_end(self): - self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) - self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) + self.assertEqual(asynchat.find_prefix_at_end(b"qwerty\r", b"\r\n"), 1) + self.assertEqual(asynchat.find_prefix_at_end(b"qwertydkjf", b"\r\n"), 0) -class TestFifo(unittest.TestCase): - def test_basic(self): - f = asynchat.fifo() - f.push(7) - f.push(b'a') - self.assertEqual(len(f), 2) - self.assertEqual(f.first(), 7) - self.assertEqual(f.pop(), (1, 7)) - self.assertEqual(len(f), 1) - self.assertEqual(f.first(), b'a') - self.assertEqual(f.is_empty(), False) - self.assertEqual(f.pop(), (1, b'a')) - self.assertEqual(len(f), 0) - self.assertEqual(f.is_empty(), True) - self.assertEqual(f.pop(), (0, None)) - - def test_given_list(self): - f = asynchat.fifo([b'x', 17, 3]) - self.assertEqual(len(f), 3) - self.assertEqual(f.pop(), (1, b'x')) - self.assertEqual(f.pop(), (1, 17)) - self.assertEqual(f.pop(), (1, 3)) - self.assertEqual(f.pop(), (0, None)) - - def test_main(verbose=None): test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, - TestHelperFunctions, TestFifo) + TestHelperFunctions) if __name__ == "__main__": test_main(verbose=True)