=== modified file 'Doc/library/multiprocessing.rst' --- Doc/library/multiprocessing.rst 2008-06-28 01:42:41 +0000 +++ Doc/library/multiprocessing.rst 2008-07-07 22:14:46 +0000 @@ -28,11 +28,11 @@ from multiprocessing import Process def f(name): - print 'hello', name + print('hello', name) if __name__ == '__main__': p = Process(target=f, args=('bob',)) - p.start() + p.start() # prints 'hello bob' p.join() Here the function ``f`` is run in a child process. @@ -62,7 +62,7 @@ q = Queue() p = Process(target=f, args=(q,)) p.start() - print q.get() # prints "[42, None, 'hello']" + print(q.get()) # prints "[42, None, 'hello']" p.join() Queues are thread and process safe. @@ -82,7 +82,7 @@ parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() - print parent_conn.recv() # prints "[42, None, 'hello']" + print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join() The two connection objects returned by :func:`Pipe` represent the two ends of @@ -99,13 +99,14 @@ :mod:`multiprocessing` contains equivalents of all the synchronization primitives from :mod:`threading`. For instance one can use a lock to ensure -that only one process prints to standard output at a time:: +that only one process prints to standard output at a time. For example, this +code:: from multiprocessing import Process, Lock def f(l, i): l.acquire() - print 'hello world', i + print('hello world', i) l.release() if __name__ == '__main__': @@ -113,6 +114,19 @@ for num in range(10): Process(target=f, args=(lock, num)).start() + +will print :: + + hello world 0 + hello world 1 + hello world 2 + hello world 3 + hello world 4 + hello world 5 + hello world 6 + hello world 7 + hello world 8 + hello world 9 Without using the lock output from the different processes is liable to get all mixed up. @@ -148,8 +162,8 @@ p.start() p.join() - print num.value - print arr[:] + print(num.value) + print(arr[:]) will print :: @@ -195,8 +209,8 @@ p.start() p.join() - print d - print l + print(d) + print(l) will print :: @@ -224,10 +238,11 @@ return x*x if __name__ == '__main__': - pool = Pool(processes=4) # start 4 worker processes - result = pool.applyAsync(f, [10]) # evaluate "f(10)" asynchronously - print result.get(timeout=1) # prints "100" unless your computer is *very* slow - print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" + pool = Pool(processes=4) # start 4 worker processes + result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously + print(result.get(timeout=1)) # prints "100" unless your + # computer is *very* slow + print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" Reference @@ -329,18 +344,17 @@ In addition process objects also support the following methods: - .. method:: get_pid() - - Return the process ID. Before the process is spawned, this will be - ``None``. - - .. method:: get_exit_code() + .. method:: get_ident() + + Return indentifier (PID) of process or ``None`` if it has yet to start. + + .. method:: get_exitcode() Return the child's exit code. This will be ``None`` if the process has not yet terminated. A negative value *-N* indicates that the child was terminated by signal *N*. - .. method:: get_auth_key() + .. method:: get_authkey() Return the process's authentication key (a byte string). @@ -353,7 +367,7 @@ See :ref:`multiprocessing-auth-keys`. - .. method:: set_auth_key(authkey) + .. method:: set_authkey(authkey) Set the process's authentication key which must be a byte string. @@ -380,17 +394,17 @@ Example usage of some of the methods of :class:`Process`:: - >>> import processing, time, signal - >>> p = processing.Process(target=time.sleep, args=(1000,)) - >>> print p, p.is_alive() + >>> import multiprocessing, time, signal + >>> p = mutliprocessing.Process(target=time.sleep, args=(1000,)) + >>> print(p, p.is_alive()) False >>> p.start() - >>> print p, p.is_alive() + >>> print(p, p.is_alive()) True >>> p.terminate() - >>> print p, p.is_alive() + >>> print(p, p.is_alive()) False - >>> p.get_exit_code() == -signal.SIGTERM + >>> p.get_exitcode() == -signal.SIGTERM True @@ -504,13 +518,13 @@ .. method:: put(item[, block[, timeout]]) Put item into the queue. If the optional argument *block* is ``True`` - (the default) and *timeout* is ``None`` (the default), block if necessary until - a free slot is available. If *timeout* is a positive number, it blocks at - most *timeout* seconds and raises the :exc:`queue.Full` exception if no - free slot was available within that time. Otherwise (*block* is - ``False``), put an item on the queue if a free slot is immediately - available, else raise the :exc:`queue.Full` exception (*timeout* is - ignored in that case). + (the default) and *timeout* is ``None`` (the default), block if necessary + until a free slot is available. If *timeout* is a positive number, it + blocks at most *timeout* seconds and raises the :exc:`queue.Full` + exception if no free slot was available within that time. Otherwise + (*block* is ``False``), put an item on the queue if a free slot is + immediately available, else raise the :exc:`queue.Full` exception + (*timeout* is ignored in that case). .. method:: put_nowait(item) @@ -527,7 +541,6 @@ :exc:`queue.Empty` exception (*timeout* is ignored in that case). .. method:: get_nowait() - get_no_wait() Equivalent to ``get(False)``. @@ -622,7 +635,7 @@ from multiprocessing import Process, freeze_support def f(): - print 'hello world!' + print('hello world!') if __name__ == '__main__': freeze_support() @@ -734,12 +747,12 @@ >>> from multiprocessing import Pipe >>> a, b = Pipe() - >>> a.send([1, 'hello', None]) + >>> a.send([42, None, 'hello']) >>> b.recv() - [1, 'hello', None] - >>> b.send_bytes('thank you') + [42, None, 'hello'] + >>> b.send_bytes(b'thank you') >>> a.recv_bytes() - 'thank you' + b'thank you' >>> import array >>> arr1 = array.array('i', range(5)) >>> arr2 = array.array('i', [0] * 10) @@ -845,12 +858,10 @@ ctypes type or a one character typecode of the kind used by the :mod:`array` module. *\*args* is passed on to the constructor for the type. - If *lock* is ``True`` (the default) then a new lock object is created to - synchronize access to the value. If *lock* is a :class:`Lock` or + If *lock* is ``None`` (the default) then a new :class:`RLock` object is + created to synchronize access to the value. If *lock* is a :class:`Lock` or :class:`RLock` object then that will be used to synchronize access to the - value. If *lock* is ``False`` then access to the returned object will not be - automatically protected by a lock, so it will not necessarily be - "process-safe". + value. Note that *lock* is a keyword-only argument. @@ -866,12 +877,10 @@ Otherwise, *size_or_initializer* is a sequence which is used to initialize the array and whose length determines the length of the array. - If *lock* is ``True`` (the default) then a new lock object is created to - synchronize access to the value. If *lock* is a :class:`Lock` or + If *lock* is ``None`` (the default) then a new :class:`RLock` object is + created to synchronize access to the value. If *lock* is a :class:`Lock` or :class:`RLock` object then that will be used to synchronize access to the - value. If *lock* is ``False`` then access to the returned object will not be - automatically protected by a lock, so it will not necessarily be - "process-safe". + value. Note that *lock* is a keyword only argument. @@ -934,12 +943,10 @@ process-safe synchronization wrapper may be returned instead of a raw ctypes array. - If *lock* is ``True`` (the default) then a new lock object is created to - synchronize access to the value. If *lock* is a :class:`Lock` or + If *lock* is ``None`` (the default) then a new :class:`RLock` object is + created to synchronize access to the value. If *lock* is a :class:`Lock` or :class:`RLock` object then that will be used to synchronize access to the - value. If *lock* is ``False`` then access to the returned object will not be - automatically protected by a lock, so it will not necessarily be - "process-safe". + value. Note that *lock* is a keyword-only argument. @@ -949,12 +956,10 @@ process-safe synchronization wrapper may be returned instead of a raw ctypes object. - If *lock* is ``True`` (the default) then a new lock object is created to - synchronize access to the value. If *lock* is a :class:`Lock` or + If *lock* is ``None`` (the default) then a new :class:`RLock` object is + created to synchronize access to the value. If *lock* is a :class:`Lock` or :class:`RLock` object then that will be used to synchronize access to the - value. If *lock* is ``False`` then access to the returned object will not be - automatically protected by a lock, so it will not necessarily be - "process-safe". + value. Note that *lock* is a keyword-only argument. @@ -1013,18 +1018,18 @@ lock = Lock() n = Value('i', 7) - x = Value(ctypes.c_double, 1.0/3.0, lock=False) - s = Array('c', 'hello world', lock=lock) - A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock) + x = Value(c_double, 1/3) + s = Array('c', b'hello world', lock=lock) + A = Array(Point, [(1.875, -6.25), (-5.75, 2.0), (2.375, 9.5)], lock=lock) p = Process(target=modify, args=(n, x, s, A)) p.start() p.join() - print n.value - print x.value - print s.value - print [(a.x, a.y) for a in A] + print(n.value) + print(x.value) + print(s.value) + print([(a.x, a.y) for a in A]) .. highlightlang:: none @@ -1033,7 +1038,7 @@ 49 0.1111111111111111 - HELLO WORLD + b'HELLO WORLD' [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)] .. highlightlang:: python @@ -1081,22 +1086,6 @@ Start a subprocess to start the manager. - .. method:: server_forever() - - Run the server in the current process. - - .. method:: from_address(address, authkey) - - A class method which creates a manager object referring to a pre-existing - server process which is using the given address and authentication key. - - .. method:: shutdown() - - Stop the process used by the manager. This is only available if - :meth:`start` has been used to start the server process. - - This can be called multiple times. - .. method:: register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]]) A classmethod which can be used for registering a type or callable with @@ -1222,7 +1211,7 @@ >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy - >>> print Global + >>> print(Global) Namespace(x=10, y='hello') @@ -1250,8 +1239,8 @@ manager = MyManager() manager.start() maths = manager.Maths() - print maths.add(4, 3) # prints 7 - print maths.mul(7, 8) # prints 56 + print(maths.add(4, 3)) # prints 7 + print(maths.mul(7, 8)) # prints 56 Using a remote manager @@ -1265,33 +1254,34 @@ >>> from multiprocessing.managers import BaseManager >>> import queue - >>> queue = queue.Queue() + >>> q = queue.Queue() >>> class QueueManager(BaseManager): pass ... - >>> QueueManager.register('getQueue', callable=lambda:queue) - >>> m = QueueManager(address=('', 50000), authkey='abracadabra') - >>> m.serveForever() + >>> QueueManager.register('get_queue', callable=lambda: q) + >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') + >>> m.get_server().serve_forever() One client can access the server as follows:: >>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass ... - >>> QueueManager.register('getQueue') - >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), - >>> authkey='abracadabra') - >>> queue = m.getQueue() - >>> queue.put('hello') + >>> QueueManager.register('get_queue') + >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') + >>> m.connect() + >>> q = m.get_queue() + >>> q.put('hello') Another client can also use it:: >>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass ... - >>> QueueManager.register('getQueue') - >>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='abracadabra') - >>> queue = m.getQueue() - >>> queue.get() + >>> QueueManager.register('get_queue') + >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') + >>> m.connect() + >>> q = m.get_queue() + >>> q.get() 'hello' @@ -1310,9 +1300,9 @@ >>> from multiprocessing import Manager >>> manager = Manager() >>> l = manager.list([i*i for i in range(10)]) - >>> print l + >>> print(l) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] - >>> print repr(l) + >>> print(repr(l)) >>> l[4] 16 @@ -1331,10 +1321,10 @@ >>> a = manager.list() >>> b = manager.list() >>> a.append(b) # referent of a now contains referent of b - >>> print a, b + >>> print(a, b) [[]] [] >>> b.append('hello') - >>> print a, b + >>> print(a, b) [['hello']] ['hello'] .. note:: @@ -1351,13 +1341,13 @@ Proxy objects are instances of subclasses of :class:`BaseProxy`. - .. method:: _call_method(methodname[, args[, kwds]]) + .. method:: _callmethod(methodname[, args[, kwds]]) Call and return the result of a method of the proxy's referent. If ``proxy`` is a proxy whose referent is ``obj`` then the expression :: - proxy._call_method(methodname, args, kwds) + proxy._callmethod(methodname, args, kwds) will evaluate the expression :: @@ -1370,9 +1360,9 @@ argument of :meth:`BaseManager.register`. If an exception is raised by the call, then then is re-raised by - :meth:`_call_method`. If some other exception is raised in the manager's + :meth:`_callmethod`. If some other exception is raised in the manager's process then this is converted into a :exc:`RemoteError` exception and is - raised by :meth:`_call_method`. + raised by :meth:`_callmethod`. Note in particular that an exception will be raised if *methodname* has not been *exposed* @@ -1380,16 +1370,16 @@ An example of the usage of :meth:`_call_method`:: >>> l = manager.list(range(10)) - >>> l._call_method('__len__') + >>> l._callmethod('__len__') 10 - >>> l._call_method('__getslice__', (2, 7)) # equiv to `l[2:7]` + >>> l._callmethod('__getitem__', (slice(2, 7), )) # equiv to `l[2:7]` [2, 3, 4, 5, 6] - >>> l._call_method('__getitem__', (20,)) # equiv to `l[20]` + >>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]` Traceback (most recent call last): ... IndexError: list index out of range - .. method:: _get_value() + .. method:: _getvalue() Return a copy of the referent. @@ -1436,8 +1426,8 @@ .. method:: apply(func[, args[, kwds]]) - Equivalent of the :func:`apply` builtin function. It blocks till the - result is ready. + Use of this method is equivalent to ``func(*args, **kwargs)``. It blocks + till the result is ready. .. method:: apply_async(func[, args[, kwds[, callback]]]) @@ -1450,8 +1440,8 @@ .. method:: map(func, iterable[, chunksize]) - A parallel equivalent of the :func:`map` builtin function. It blocks till - the result is ready. + A parallel equivalent of the ``list(map(func, iterable))`` call. It blocks + till the result is ready. This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these @@ -1468,7 +1458,7 @@ .. method:: imap(func, iterable[, chunksize]) - An equivalent of :func:`itertools.imap`. + An equivalent of :func:`map` builtin function. The *chunksize* argument is the same as the one used by the :meth:`.map` method. For very long iterables using a large value for *chunksize* can @@ -1508,7 +1498,7 @@ The class of the result returned by :meth:`Pool.apply_async` and :meth:`Pool.map_async`. - .. method:: get([timeout) + .. method:: get([timeout]) Return the result when it arrives. If *timeout* is not ``None`` and the result does not arrive within *timeout* seconds then @@ -1539,18 +1529,20 @@ pool = Pool(processes=4) # start 4 worker processes result = pool.applyAsync(f, (10,)) # evaluate "f(10)" asynchronously - print result.get(timeout=1) # prints "100" unless your computer is *very* slow + print(result.get(timeout=1)) # prints "100" unless your computer + # is *very* slow - print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]" + print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]" it = pool.imap(f, range(10)) - print it.next() # prints "0" - print it.next() # prints "1" - print it.next(timeout=1) # prints "4" unless your computer is *very* slow + print(it.__next()) # prints "0" + print(it.__next__()) # prints "1" + print(it.__next__(timeout=1)) # prints "4" unless your + # computer is *very* slow import time result = pool.applyAsync(time.sleep, (10,)) - print result.get(timeout=1) # raises TimeoutError + print(result.get(timeout=1)) # raises TimeoutError .. _multiprocessing-listeners-clients: @@ -1579,7 +1571,7 @@ then a welcome message is sent to the other end of the connection. Otherwise :exc:`AuthenticationError` is raised. -.. function:: answerChallenge(connection, authkey) +.. function:: answer_challenge(connection, authkey) Receive a message, calculate the digest of the message using *authkey* as the key, and then send the digest back. @@ -1587,7 +1579,7 @@ If a welcome message is not received, then :exc:`AuthenticationError` is raised. -.. function:: Client(address[, family[, authenticate[, authkey]]]) +.. function:: Client(address[, family[, authkey]]) Attempt to set up a connection to the listener which is using address *address*, returning a :class:`~multiprocessing.Connection`. @@ -1596,13 +1588,12 @@ generally be omitted since it can usually be inferred from the format of *address*. (See :ref:`multiprocessing-address-formats`) - If *authentication* is ``True`` or *authkey* is a string then digest - authentication is used. The key used for authentication will be either - *authkey* or ``current_process().get_auth_key()`` if *authkey* is ``None``. - If authentication fails then :exc:`AuthenticationError` is raised. See + If *authkey* is provided then digest authentication is used. If *authkey* is + ``None`` then ``current_process().get_auth_key()`` is used as authentication + key. If authentication fails then :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`. -.. class:: Listener([address[, family[, backlog[, authenticate[, authkey]]]]]) +.. class:: Listener([address[, family[, backlog[, authkey]]]]) A wrapper for a bound socket or Windows named pipe which is 'listening' for connections. @@ -1624,16 +1615,11 @@ If the listener object uses a socket then *backlog* (1 by default) is passed to the :meth:`listen` method of the socket once it has been bound. - If *authenticate* is ``True`` (``False`` by default) or *authkey* is not - ``None`` then digest authentication is used. - - If *authkey* is a string then it will be used as the authentication key; + If *authkey* is provided then it will be used as the authentication key; otherwise it must be *None*. - If *authkey* is ``None`` and *authenticate* is ``True`` then - ``current_process().get_auth_key()`` is used as the authentication key. If - *authkey* is ``None`` and *authentication* is ``False`` then no - authentication is done. If authentication fails then + If *authkey* is ``None`` then ``current_process().get_auth_key()`` is used as + the authentication key. If authentication fails then :exc:`AuthenticationError` is raised. See :ref:`multiprocessing-auth-keys`. .. method:: accept() @@ -1677,14 +1663,14 @@ from array import array address = ('localhost', 6000) # family is deduced to be 'AF_INET' - listener = Listener(address, authkey='secret password') + listener = Listener(address, authkey=b'secret password') conn = listener.accept() - print 'connection accepted from', listener.last_accepted + print('connection accepted from', listener.last_accepted) conn.send([2.25, None, 'junk', float]) - conn.send_bytes('hello') + conn.send_bytes(b'hello') conn.send_bytes(array('i', [42, 1729])) @@ -1698,15 +1684,15 @@ from array import array address = ('localhost', 6000) - conn = Client(address, authkey='secret password') - - print conn.recv() # => [2.25, None, 'junk', float] - - print conn.recv_bytes() # => 'hello' + conn = Client(address, authkey=b'secret password') + + print(conn.recv()) # => [2.25, None, 'junk', ] + + print(conn.recv_bytes()) # => b'hello' arr = array('i', [0, 0, 0, 0, 0]) - print conn.recv_bytes_into(arr) # => 8 - print arr # => array('i', [42, 1729, 0, 0, 0]) + print(conn.recv_bytes_into(arr)) # => 8 + print(arr) # => array('i', [42, 1729, 0, 0, 0]) conn.close() @@ -1741,10 +1727,10 @@ risk. Therefore :class:`Listener` and :func:`Client` use the :mod:`hmac` module to provide digest authentication. -An authentication key is a string which can be thought of as a password: once a -connection is established both ends will demand proof that the other knows the -authentication key. (Demonstrating that both ends are using the same key does -**not** involve sending the key over the connection.) +An authentication key is a byte string which can be thought of as a password: +once a connection is established both ends will demand proof that the other +knows the authentication key. (Demonstrating that both ends are using the same +key does **not** involve sending the key over the connection.) If authentication is requested but do authentication key is specified then the return value of ``current_process().get_auth_key`` is used (see @@ -1782,17 +1768,18 @@ Below is an example session with logging turned on:: - >>> import processing, logging - >>> logger = processing.getLogger() - >>> logger.setLevel(logging.INFO) - >>> logger.warning('doomed') - [WARNING/MainProcess] doomed - >>> m = processing.Manager() - [INFO/SyncManager-1] child process calling self.run() - [INFO/SyncManager-1] manager bound to '\\\\.\\pipe\\pyc-2776-0-lj0tfa' + >>> import multiprocessing, logging + >>> logging.basicConfig(level=logging.INFO) + >>> l = multiprocessing.get_logger() + >>> l.warning('doomed') + WARNING:multiprocessing:doomed + >>> m = multiprocessing.Manager() + INFO:multiprocessing:child process calling self.run() + INFO:multiprocessing:created temp directory /tmp/pymp-xG2EIt + INFO:multiprocessing:manager serving at '/tmp/pymp-xG2EIt/listener-OhsDEA' >>> del m - [INFO/MainProcess] sending shutdown message to manager - [INFO/SyncManager-1] manager exiting with exitcode 0 + INFO:multiprocessing:sending shutdown message to manager + INFO:multiprocessing:manager exiting with exitcode 0 The :mod:`multiprocessing.dummy` module @@ -1885,11 +1872,11 @@ q.put('X' * 1000000) if __name__ == '__main__': - queue = Queue() - p = Process(target=f, args=(queue,)) + q = Queue() + p = Process(target=f, args=(q,)) p.start() p.join() # this deadlocks - obj = queue.get() + obj = q.get() A fix here would be to swap the last two lines round (or simply remove the ``p.join()`` line). @@ -1967,7 +1954,7 @@ from multiprocessing import Process def foo(): - print 'hello' + print('hello') p = Process(target=foo) p.start() @@ -1978,7 +1965,7 @@ from multiprocessing import Process, freeze_support def foo(): - print 'hello' + print('hello') if __name__ == '__main__': freeze_support()