Index: Lib/multiprocessing/reduction.py =================================================================== --- Lib/multiprocessing/reduction.py (revision 64415) +++ Lib/multiprocessing/reduction.py (working copy) @@ -13,11 +13,10 @@ import sys import socket import threading -import copy_reg import _multiprocessing from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close +from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.connection import Client, Listener @@ -134,7 +133,7 @@ return new_handle # -# Register `_multiprocessing.Connection` with `copy_reg` +# Register `_multiprocessing.Connection` with `ForkingPickler` # def reduce_connection(conn): @@ -147,10 +146,10 @@ handle, readable=readable, writable=writable ) -copy_reg.pickle(_multiprocessing.Connection, reduce_connection) +ForkingPickler.register(_multiprocessing.Connection, reduce_connection) # -# Register `socket.socket` with `copy_reg` +# Register `socket.socket` with `ForkingPickler` # def fromfd(fd, family, type_, proto=0): @@ -169,10 +168,10 @@ close(fd) return _sock -copy_reg.pickle(socket.socket, reduce_socket) +ForkingPickler.register(socket.socket, reduce_socket) # -# Register `_multiprocessing.PipeConnection` with `copy_reg` +# Register `_multiprocessing.PipeConnection` with `ForkingPickler` # if sys.platform == 'win32': @@ -187,4 +186,4 @@ handle, readable=readable, writable=writable ) - copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) + ForkingPickler.register(_multiprocessing.PipeConnection, reduce_pipe_connection) Index: Lib/multiprocessing/managers.py =================================================================== --- Lib/multiprocessing/managers.py (revision 64415) +++ Lib/multiprocessing/managers.py (working copy) @@ -18,13 +18,12 @@ import weakref import threading import array -import copy_reg import Queue from traceback import format_exc from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning +from multiprocessing.forking import exit, Popen, assert_spawning, ForkingPickler from multiprocessing.util import Finalize, info try: @@ -47,14 +46,14 @@ def reduce_array(a): return array.array, (a.typecode, a.tostring()) -copy_reg.pickle(array.array, reduce_array) +ForkingPickler.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # XXX only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types: - copy_reg.pickle(view_type, rebuild_as_list) + ForkingPickler.register(view_type, rebuild_as_list) # # Type for identifying shared objects Index: Lib/multiprocessing/util.py =================================================================== --- Lib/multiprocessing/util.py (revision 64415) +++ Lib/multiprocessing/util.py (working copy) @@ -8,7 +8,6 @@ import itertools import weakref -import copy_reg import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does @@ -303,34 +302,3 @@ def __reduce__(self): return type(self), () -# -# Try making some callable types picklable -# - -def _reduce_method(m): - if m.im_self is None: - return getattr, (m.im_class, m.im_func.func_name) - else: - return getattr, (m.im_self, m.im_func.func_name) -copy_reg.pickle(type(Finalize.__init__), _reduce_method) - -def _reduce_method_descriptor(m): - return getattr, (m.__objclass__, m.__name__) -copy_reg.pickle(type(list.append), _reduce_method_descriptor) -copy_reg.pickle(type(int.__add__), _reduce_method_descriptor) - -def _reduce_builtin_function_or_method(m): - return getattr, (m.__self__, m.__name__) -copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method) -copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method) - -try: - from functools import partial -except ImportError: - pass -else: - def _reduce_partial(p): - return _rebuild_partial, (p.func, p.args, p.keywords or {}) - def _rebuild_partial(func, args, keywords): - return partial(func, *args, **keywords) - copy_reg.pickle(partial, _reduce_partial) Index: Lib/multiprocessing/sharedctypes.py =================================================================== --- Lib/multiprocessing/sharedctypes.py (revision 64415) +++ Lib/multiprocessing/sharedctypes.py (working copy) @@ -9,10 +9,9 @@ import sys import ctypes import weakref -import copy_reg from multiprocessing import heap, RLock -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] @@ -127,8 +126,7 @@ def rebuild_ctype(type_, wrapper, length): if length is not None: type_ = type_ * length - if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table: - copy_reg.pickle(type_, reduce_ctype) + ForkingPickler.register(type_, reduce_ctype) obj = type_.from_address(wrapper.get_address()) obj._wrapper = wrapper return obj Index: Lib/multiprocessing/forking.py =================================================================== --- Lib/multiprocessing/forking.py (revision 64415) +++ Lib/multiprocessing/forking.py (working copy) @@ -12,7 +12,7 @@ from multiprocessing import util, process -__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close'] +__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close', 'ForkingPickler'] # # Check that the current thread is spawning a child process @@ -26,6 +26,49 @@ ) # +# Try making some callable types picklable +# + +from pickle import Pickler +class ForkingPickler(Pickler): + dispatch = Pickler.dispatch.copy() + + @classmethod + def register(cls, type, reduce): + def dispatcher(self, obj): + rv = reduce(obj) + self.save_reduce(obj=obj, *rv) + cls.dispatch[type] = dispatcher + +def _reduce_method(m): + if m.im_self is None: + return getattr, (m.im_class, m.im_func.func_name) + else: + return getattr, (m.im_self, m.im_func.func_name) +ForkingPickler.register(type(ForkingPickler.save), _reduce_method) + +def _reduce_method_descriptor(m): + return getattr, (m.__objclass__, m.__name__) +ForkingPickler.register(type(list.append), _reduce_method_descriptor) +ForkingPickler.register(type(int.__add__), _reduce_method_descriptor) + +#def _reduce_builtin_function_or_method(m): +# return getattr, (m.__self__, m.__name__) +#ForkingPickler.register(type(list().append), _reduce_builtin_function_or_method) +#ForkingPickler.register(type(int().__add__), _reduce_builtin_function_or_method) + +try: + from functools import partial +except ImportError: + pass +else: + def _reduce_partial(p): + return _rebuild_partial, (p.func, p.args, p.keywords or {}) + def _rebuild_partial(func, args, keywords): + return partial(func, *args, **keywords) + ForkingPickler.register(partial, _reduce_partial) + +# # Unix # @@ -105,17 +148,19 @@ import thread import msvcrt import _subprocess - import copy_reg import time from ._multiprocessing import win32, Connection, PipeConnection from .util import Finalize - try: - from cPickle import dump, load, HIGHEST_PROTOCOL - except ImportError: - from pickle import dump, load, HIGHEST_PROTOCOL + #try: + # from cPickle import dump, load, HIGHEST_PROTOCOL + #except ImportError: + from pickle import load, HIGHEST_PROTOCOL + def dump(obj, file, protocol=None): + ForkingPickler(file, protocol).dump(obj) + # # # @@ -346,10 +391,9 @@ return type(conn), (Popen.duplicate_for_child(conn.fileno()), conn.readable, conn.writable) - copy_reg.pickle(Connection, reduce_connection) - copy_reg.pickle(PipeConnection, reduce_connection) + ForkingPickler.register(Connection, reduce_connection) + ForkingPickler.register(PipeConnection, reduce_connection) - # # Prepare current process #