--- queues.py Tue Mar 02 18:14:16 2010 +++ queues_pickled.py Tue Mar 02 18:08:42 2010 @@ -16,6 +16,7 @@ import atexit import weakref +from cPickle import dumps, HIGHEST_PROTOCOL from Queue import Empty, Full import _multiprocessing from multiprocessing import Pipe @@ -29,10 +30,11 @@ class Queue(object): - def __init__(self, maxsize=0): + def __init__(self, maxsize=0, bufferPickled=False): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize + self._bufferPickled = bufferPickled self._reader, self._writer = Pipe(duplex=False) self._rlock = Lock() self._opid = os.getpid() @@ -49,11 +51,11 @@ def __getstate__(self): assert_spawning(self) - return (self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + return (self._maxsize, self._bufferPickled, self._reader, + self._writer, self._rlock, self._wlock, self._sem, self._opid) def __setstate__(self, state): - (self._maxsize, self._reader, self._writer, + (self._maxsize, self._bufferPickled, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state self._after_fork() @@ -66,7 +68,10 @@ self._joincancelled = False self._closed = False self._close = None - self._send = self._writer.send + if self._bufferPickled: + self._send = self._writer.send_bytes + else: + self._send = self._writer.send self._recv = self._reader.recv self._poll = self._reader.poll @@ -79,7 +84,10 @@ try: if self._thread is None: self._start_thread() - self._buffer.append(obj) + if self._bufferPickled: + self._buffer.append(dumps(obj, HIGHEST_PROTOCOL)) + else: + self._buffer.append(obj) self._notempty.notify() finally: self._notempty.release()