Index: Lib/multiprocessing/pool.py =================================================================== --- Lib/multiprocessing/pool.py (Revision 67029) +++ Lib/multiprocessing/pool.py (Arbeitskopie) @@ -127,7 +127,7 @@ ) def _setup_queues(self): - from .queues import SimpleQueue + from multiprocessing.queues import SimpleQueue self._inqueue = SimpleQueue() self._outqueue = SimpleQueue() self._quick_put = self._inqueue._writer.send @@ -213,7 +213,7 @@ @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): - thread = threading.current_thread() + thread = threading.currentThread() for taskseq, set_length in iter(taskqueue.get, None): i = -1 @@ -252,7 +252,7 @@ @staticmethod def _handle_results(outqueue, get, cache): - thread = threading.current_thread() + thread = threading.currentThread() while 1: try: @@ -346,7 +346,7 @@ # task_handler may be blocked trying to put items on inqueue debug('removing tasks from inqueue until task handler finished') inqueue._rlock.acquire() - while task_handler.is_alive() and inqueue._reader.poll(): + while task_handler.isAlive() and inqueue._reader.poll(): inqueue._reader.recv() time.sleep(0) @@ -362,7 +362,7 @@ debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) - assert result_handler.is_alive() or len(cache) == 0 + assert result_handler.isAlive() or len(cache) == 0 result_handler._state = TERMINATE outqueue.put(None) # sentinel @@ -573,7 +573,7 @@ class ThreadPool(Pool): - from .dummy import Process + from multiprocessing.dummy import Process def __init__(self, processes=None, initializer=None, initargs=()): Pool.__init__(self, processes, initializer, initargs) @@ -591,6 +591,6 @@ try: inqueue.queue.clear() inqueue.queue.extend([None] * size) - inqueue.not_empty.notify_all() + inqueue.not_empty.notifyAll() finally: inqueue.not_empty.release() Index: Lib/multiprocessing/synchronize.py =================================================================== --- Lib/multiprocessing/synchronize.py (Revision 67029) +++ Lib/multiprocessing/synchronize.py (Arbeitskopie) @@ -120,8 +120,8 @@ try: if self._semlock._is_mine(): name = current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() elif self._semlock._get_value() == 1: name = 'None' elif self._semlock._count() > 0: @@ -145,8 +145,8 @@ try: if self._semlock._is_mine(): name = current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() count = self._semlock._count() elif self._semlock._get_value() == 1: name, count = 'None', 0 @@ -257,6 +257,8 @@ # rezero wait_semaphore in case some timeouts just happened while self._wait_semaphore.acquire(False): pass + + notifyAll = notify_all # # Event @@ -283,7 +285,7 @@ try: self._flag.acquire(False) self._flag.release() - self._cond.notify_all() + self._cond.notifyAll() finally: self._cond.release() Index: Lib/multiprocessing/reduction.py =================================================================== --- Lib/multiprocessing/reduction.py (Revision 67029) +++ Lib/multiprocessing/reduction.py (Arbeitskopie) @@ -34,7 +34,7 @@ if sys.platform == 'win32': import _subprocess - from ._multiprocessing import win32 + from multiprocessing._multiprocessing import win32 def send_handle(conn, handle, destination_pid): process_handle = win32.OpenProcess( @@ -91,7 +91,7 @@ return _listener def _serve(): - from .util import is_exiting, sub_warning + from multiprocessing.util import is_exiting, sub_warning while 1: try: Index: Lib/multiprocessing/heap.py =================================================================== --- Lib/multiprocessing/heap.py (Revision 67029) +++ Lib/multiprocessing/heap.py (Arbeitskopie) @@ -7,13 +7,17 @@ # import bisect -import mmap import tempfile import os import sys import threading import itertools +if sys.version_info < (2, 5): + import multiprocessing._mmap25 as mmap +else: + import mmap + import _multiprocessing from multiprocessing.util import Finalize, info from multiprocessing.forking import assert_spawning @@ -26,7 +30,7 @@ if sys.platform == 'win32': - from ._multiprocessing import win32 + from multiprocessing._multiprocessing import win32 class Arena(object): Index: Lib/multiprocessing/managers.py =================================================================== --- Lib/multiprocessing/managers.py (Revision 67029) +++ Lib/multiprocessing/managers.py (Arbeitskopie) @@ -31,6 +31,9 @@ except ImportError: from pickle import PicklingError +# forward compatibility +bytes = str + # # Register some things for pickling # @@ -201,7 +204,7 @@ Handle requests from the proxies in a particular process/thread ''' util.debug('starting server thread to service %r', - threading.current_thread().name) + threading.currentThread().getName()) recv = conn.recv send = conn.send @@ -251,7 +254,7 @@ except EOFError: util.debug('got EOF -- exiting thread serving %r', - threading.current_thread().name) + threading.currentThread().getName()) sys.exit(0) except Exception: @@ -264,7 +267,7 @@ send(('#UNSERIALIZABLE', repr(msg))) except Exception, e: util.info('exception in thread serving %r', - threading.current_thread().name) + threading.currentThread().getName()) util.info(' ... message was %r', msg) util.info(' ... exception was %r', e) conn.close() @@ -392,7 +395,7 @@ ''' Spawn a new thread to serve this connection ''' - threading.current_thread().name = name + threading.currentThread().setName(name) c.send(('#RETURN', None)) self.serve_client(c) @@ -573,7 +576,7 @@ ''' Shutdown the manager process; will be registered as a finalizer ''' - if process.is_alive(): + if process.isAlive(): util.info('sending shutdown message to manager') try: conn = _Client(address, authkey=authkey) @@ -585,13 +588,13 @@ pass process.join(timeout=0.2) - if process.is_alive(): + if process.isAlive(): util.info('manager still alive') if hasattr(process, 'terminate'): util.info('trying to `terminate()` manager process') process.terminate() process.join(timeout=0.1) - if process.is_alive(): + if process.isAlive(): util.info('manager still alive after terminate') state.value = State.SHUTDOWN @@ -704,8 +707,8 @@ def _connect(self): util.debug('making connection to manager') name = current_process().name - if threading.current_thread().name != 'MainThread': - name += '|' + threading.current_thread().name + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() conn = self._Client(self._token.address, authkey=self._authkey) dispatch(conn, None, 'accept_connection', (name,)) self._tls.connection = conn @@ -718,7 +721,7 @@ conn = self._tls.connection except AttributeError: util.debug('thread %r does not own a connection', - threading.current_thread().name) + threading.currentThread().getName()) self._connect() conn = self._tls.connection @@ -782,7 +785,7 @@ # the process owns no more references to objects for this manager if not idset and hasattr(tls, 'connection'): util.debug('thread %r has no more proxies so closing conn', - threading.current_thread().name) + threading.currentThread().getName()) tls.connection.close() del tls.connection @@ -961,14 +964,17 @@ class ConditionProxy(AcquirerProxy): # XXX will Condition.notfyAll() name be available in Py3.0? - _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all') + _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all', 'notifyAll') def wait(self, timeout=None): return self._callmethod('wait', (timeout,)) def notify(self): return self._callmethod('notify') def notify_all(self): - return self._callmethod('notify_all') + return self._callmethod('notifyAll') + def notifyAll(self): + return self._callmethod('notifyAll') + class EventProxy(BaseProxy): _exposed_ = ('is_set', 'set', 'clear', 'wait') def is_set(self): Index: Lib/multiprocessing/process.py =================================================================== --- Lib/multiprocessing/process.py (Revision 67029) +++ Lib/multiprocessing/process.py (Arbeitskopie) @@ -26,6 +26,9 @@ except OSError: ORIGINAL_DIR = None +# forward compatibility +bytes = str + # # Public functions # @@ -100,7 +103,7 @@ if self._Popen is not None: Popen = self._Popen else: - from .forking import Popen + from multiprocessing.forking import Popen self._popen = Popen(self) _current_process._children.add(self) @@ -131,42 +134,44 @@ return False self._popen.poll() return self._popen.returncode is None - - @property - def name(self): + + isAlive = is_alive + + def _get_name(self): return self._name - @name.setter - def name(self, name): + def _set_name(self, name): assert isinstance(name, str), 'name must be a string' self._name = name - @property - def daemon(self): + name = property(_get_name, _set_name) + + def _get_daemon(self): ''' Return whether process is a daemon ''' return self._daemonic - @daemon.setter - def daemon(self, daemonic): + def _set_daemon(self, daemonic): ''' Set whether process is a daemon ''' assert self._popen is None, 'process has already started' self._daemonic = daemonic - @property - def authkey(self): + daemon = property(_get_daemon, _set_daemon) + + def _get_authkey(self): return self._authkey - @authkey.setter - def authkey(self, authkey): + def _set_authkey(self, authkey): ''' Set authorization key of process ''' self._authkey = AuthenticationString(authkey) + authkey = property(_get_authkey, _set_authkey) + @property def exitcode(self): ''' @@ -213,7 +218,7 @@ ## def _bootstrap(self): - from . import util + from multiprocessing import util global _current_process try: @@ -255,15 +260,15 @@ # We subclass bytes to avoid accidental transmission of auth keys over network # -class AuthenticationString(bytes): +class AuthenticationString(str): def __reduce__(self): - from .forking import Popen + from multiprocessing.forking import Popen if not Popen.thread_is_spawning(): raise TypeError( 'Pickling an AuthenticationString object is ' 'disallowed for security reasons' ) - return AuthenticationString, (bytes(self),) + return AuthenticationString, (str(self),) # # Create object representing the main process Index: Lib/multiprocessing/__init__.py =================================================================== --- Lib/multiprocessing/__init__.py (Revision 67029) +++ Lib/multiprocessing/__init__.py (Arbeitskopie) @@ -81,6 +81,9 @@ # This is down here because _multiprocessing uses BufferTooShort import _multiprocessing +# alias for forward compatibility +sys.modules['_multiprocessing'] = _multiprocessing + # # Definitions not depending on native semaphores # Index: Lib/multiprocessing/connection.py =================================================================== --- Lib/multiprocessing/connection.py (Revision 67029) +++ Lib/multiprocessing/connection.py (Arbeitskopie) @@ -21,6 +21,8 @@ from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug from multiprocessing.forking import duplicate, close +# forward compatibility +bytes = str # # @@ -164,7 +166,7 @@ else: - from ._multiprocessing import win32 + from multiprocessing._multiprocessing import win32 def Pipe(duplex=True): ''' @@ -348,9 +350,9 @@ MESSAGE_LENGTH = 20 -CHALLENGE = b'#CHALLENGE#' -WELCOME = b'#WELCOME#' -FAILURE = b'#FAILURE#' +CHALLENGE = '#CHALLENGE#' +WELCOME = '#WELCOME#' +FAILURE = '#FAILURE#' def deliver_challenge(connection, authkey): import hmac Index: Lib/multiprocessing/forking.py =================================================================== --- Lib/multiprocessing/forking.py (Revision 67029) +++ Lib/multiprocessing/forking.py (Arbeitskopie) @@ -149,9 +149,14 @@ import msvcrt import _subprocess import time + try: + from _subprocess import TerminateProcess + except ImportError: # Python 2.4 + from win32process import TerminateProcess + - from ._multiprocessing import win32, Connection, PipeConnection - from .util import Finalize + from multiprocessing._multiprocessing import win32, Connection, PipeConnection + from multiprocessing.util import Finalize #try: # from cPickle import dump, load, HIGHEST_PROTOCOL @@ -271,7 +276,7 @@ def terminate(self): if self.returncode is None: try: - _subprocess.TerminateProcess(int(self._handle), TERMINATE) + TerminateProcess(int(self._handle), TERMINATE) except WindowsError: if self.wait(timeout=0.1) is None: raise @@ -352,7 +357,7 @@ ''' Return info about parent needed by child to unpickle process object ''' - from .util import _logger, _log_to_stderr + from multiprocessing.util import _logger, _log_to_stderr d = dict( name=name, Index: Lib/multiprocessing/dummy/__init__.py =================================================================== --- Lib/multiprocessing/dummy/__init__.py (Revision 67029) +++ Lib/multiprocessing/dummy/__init__.py (Arbeitskopie) @@ -21,6 +21,7 @@ import weakref import array import itertools +import sys from multiprocessing import TimeoutError, cpu_count from multiprocessing.dummy.connection import Pipe @@ -49,30 +50,33 @@ @property def exitcode(self): - if self._start_called and not self.is_alive(): + if self._start_called and not self.isAlive(): return 0 else: return None + + if sys.version_info < (2,6): + name = property(threading.Thread.getName, threading.Thread.setName) # # # class Condition(threading._Condition): - notify_all = threading._Condition.notify_all.im_func + notify_all = threading._Condition.notifyAll.im_func # # # Process = DummyProcess -current_process = threading.current_thread +current_process = threading.currentThread current_process()._children = weakref.WeakKeyDictionary() def active_children(): children = current_process()._children for p in list(children): - if not p.is_alive(): + if not p.isAlive(): children.pop(p, None) return list(children) Index: Lib/multiprocessing/queues.py =================================================================== --- Lib/multiprocessing/queues.py (Revision 67029) +++ Lib/multiprocessing/queues.py (Arbeitskopie) @@ -205,7 +205,7 @@ @staticmethod def _feed(buffer, notempty, send, writelock, close): debug('starting thread to feed data to pipe') - from .util import is_exiting + from multiprocessing.util import is_exiting nacquire = notempty.acquire nrelease = notempty.release @@ -292,7 +292,7 @@ if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): - self._cond.notify_all() + self._cond.notifyAll() finally: self._cond.release() Index: Lib/test/test_multiprocessing.py =================================================================== --- Lib/test/test_multiprocessing.py (Revision 67029) +++ Lib/test/test_multiprocessing.py (Arbeitskopie) @@ -41,6 +41,10 @@ latin = str +# forward compatibility +if sys.version_info < (2, 6): + bytes = str + # # Constants # @@ -129,7 +133,7 @@ current = self.current_process() authkey = current.authkey - self.assertTrue(current.is_alive()) + self.assertTrue(current.isAlive()) self.assertTrue(not current.daemon) self.assertTrue(isinstance(authkey, bytes)) self.assertTrue(len(authkey) > 0) @@ -159,7 +163,7 @@ if self.TYPE != 'threads': self.assertEquals(p.authkey, current.authkey) - self.assertEquals(p.is_alive(), False) + self.assertEquals(p.isAlive(), False) self.assertEquals(p.daemon, True) self.assertTrue(p not in self.active_children()) self.assertTrue(type(self.active_children()) is list) @@ -168,7 +172,7 @@ p.start() self.assertEquals(p.exitcode, None) - self.assertEquals(p.is_alive(), True) + self.assertEquals(p.isAlive(), True) self.assertTrue(p in self.active_children()) self.assertEquals(q.get(), args[1:]) @@ -181,7 +185,7 @@ p.join() self.assertEquals(p.exitcode, 0) - self.assertEquals(p.is_alive(), False) + self.assertEquals(p.isAlive(), False) self.assertTrue(p not in self.active_children()) def _test_terminate(self): @@ -195,7 +199,7 @@ p.daemon = True p.start() - self.assertEqual(p.is_alive(), True) + self.assertEqual(p.isAlive(), True) self.assertTrue(p in self.active_children()) self.assertEqual(p.exitcode, None) @@ -205,7 +209,7 @@ self.assertEqual(join(), None) self.assertTimingAlmostEqual(join.elapsed, 0.0) - self.assertEqual(p.is_alive(), False) + self.assertEqual(p.isAlive(), False) self.assertTrue(p not in self.active_children()) p.join() @@ -713,7 +717,7 @@ # wake them all up cond.acquire() - cond.notify_all() + cond.notifyAll() cond.release() # check they have all woken @@ -1225,7 +1229,10 @@ len(arr) * buffer.itemsize) self.assertEqual(list(buffer), expected) - buffer = bytearray(latin(' ' * 40)) + if sys.version_info < (2, 6): + buffer = array.array('i', [0] * 10) + else: + buffer = bytearray(latin(' ' * 40)) self.assertEqual(conn.send_bytes(longmsg), None) try: res = conn.recv_bytes_into(buffer) @@ -1601,7 +1608,7 @@ self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) # -# Test that from ... import * works for each module +# Test that from multiprocessing... import * works for each module # class _TestImportStar(BaseTestCase): @@ -1749,12 +1756,12 @@ def test_deliver_challenge_auth_failure(self): class _FakeConnection(object): def recv_bytes(self, size): - return b'something bogus' + return 'something bogus' def send_bytes(self, data): pass self.assertRaises(multiprocessing.AuthenticationError, multiprocessing.connection.deliver_challenge, - _FakeConnection(), b'abc') + _FakeConnection(), 'abc') def test_answer_challenge_auth_failure(self): class _FakeConnection(object): @@ -1765,13 +1772,13 @@ if self.count == 1: return multiprocessing.connection.CHALLENGE elif self.count == 2: - return b'something bogus' - return b'' + return 'something bogus' + return '' def send_bytes(self, data): pass self.assertRaises(multiprocessing.AuthenticationError, multiprocessing.connection.answer_challenge, - _FakeConnection(), b'abc') + _FakeConnection(), 'abc') testcases_other = [OtherTest] Index: Modules/_multiprocessing/connection.h =================================================================== --- Modules/_multiprocessing/connection.h (Revision 67029) +++ Modules/_multiprocessing/connection.h (Arbeitskopie) @@ -181,6 +181,7 @@ return result; } +#ifdef HAS_NEW_PY_BUFFER static PyObject * connection_recvbytes_into(ConnectionObject *self, PyObject *args) { @@ -248,6 +249,73 @@ goto _cleanup; } +#else /* old buffer protocol */ + +static PyObject * +connection_recvbytes_into(ConnectionObject *self, PyObject *args) +{ + char *freeme = NULL, *buffer = NULL; + Py_ssize_t res, offset = 0; + int length = 0; + PyObject *result = NULL; + + CHECK_READABLE(self); + + if (!PyArg_ParseTuple(args, "w#|" F_PY_SSIZE_T, + &buffer, &length, &offset)) + return NULL; + + if (offset < 0) { + PyErr_SetString(PyExc_ValueError, "negative offset"); + goto _error; + } + + if (offset > length) { + PyErr_SetString(PyExc_ValueError, "offset too large"); + goto _error; + } + + res = conn_recv_string(self, buffer+offset, length-offset, + &freeme, PY_SSIZE_T_MAX); + + if (res < 0) { + if (res == MP_BAD_MESSAGE_LENGTH) { + if ((self->flags & WRITABLE) == 0) { + Py_BEGIN_ALLOW_THREADS + CLOSE(self->handle); + Py_END_ALLOW_THREADS + self->handle = INVALID_HANDLE_VALUE; + } else { + self->flags = WRITABLE; + } + } + mp_SetError(PyExc_IOError, res); + } else { + if (freeme == NULL) { + result = PyInt_FromSsize_t(res); + } else { + result = PyObject_CallFunction(BufferTooShort, + F_RBUFFER "#", + freeme, res); + PyMem_Free(freeme); + if (result) { + PyErr_SetObject(BufferTooShort, result); + Py_DECREF(result); + } + goto _error; + } + } + +_cleanup: + return result; + +_error: + result = NULL; + goto _cleanup; +} +#endif /* buffer */ + + /* * Functions for transferring objects */ Index: Modules/_multiprocessing/multiprocessing.h =================================================================== --- Modules/_multiprocessing/multiprocessing.h (Revision 67029) +++ Modules/_multiprocessing/multiprocessing.h (Arbeitskopie) @@ -146,6 +146,16 @@ #endif /* + * Python 2.5 compatibility + */ + +#if PY_VERSION_HEX < 0x02060000 +#else +# define HAS_NEW_PY_BUFFER 1 +#endif + + +/* * Connection definition */