# HG changeset patch # Parent b31ea7995214531c997898b7f1c8d840fd39d901 Implement subset of multiprocessing.sharedctypes using memoryview ctypes is unavailable on some platforms, so making multiprocessing's shared memory depend on it is problematic. This patch adds RawValue, RawArray, Value and Array callables to multiprocessing.heap. If the "typecode" used is a letter rather than a ctypes type, then these are almost equivalent to the callables of the same name in multiprocessing.sharedctypes. diff -r b31ea7995214 Lib/multiprocessing/heap.py --- a/Lib/multiprocessing/heap.py Tue May 29 12:01:47 2012 +0100 +++ b/Lib/multiprocessing/heap.py Tue May 29 15:28:59 2012 +0100 @@ -13,12 +13,17 @@ import sys import threading import itertools +import weakref +import array +import struct import _multiprocessing from multiprocessing.util import Finalize, info -from multiprocessing.forking import assert_spawning +from multiprocessing.forking import assert_spawning, ForkingPickler +from multiprocessing import RLock -__all__ = ['BufferWrapper'] + +__all__ = ['RawArray', 'RawValue', 'Array', 'Value'] # # Inheirtable class which wraps an mmap, and from which blocks can be allocated @@ -221,3 +226,139 @@ def create_memoryview(self): (arena, start, stop), size = self._state return memoryview(arena.buffer)[start:start+size] + +# +# Alternative to sharedctypes.RawArray using memoryview +# +# We maintain a mapping from ids to buffer wrappers. This ensures +# that the wrapper is not garbage collected before the memoryview, and +# that we can pickle the memoryview in terms of the wrapper. +# + +def RawArray(format, size_or_initializer): + ''' + Return a memoryview object allocated from a shared mmap + ''' + if isinstance(size_or_initializer, int): + init = b'\x00' * (struct.calcsize(format) * size_or_initializer) + else: + fmt = format if format != 'c' else 'B' # array does not support 'c' + init = memoryview(array.array(fmt, size_or_initializer)).cast('B') + wrapper = BufferWrapper(len(init)) + arr = _rebuild_memoryview(wrapper, format) + arr.cast('B')[:] = init + return arr + +_id_to_wrapper = {} + +def _rebuild_memoryview(wrapper, format): + buf = wrapper.create_memoryview().cast(format) + ident = id(buf) + def callback(wr, _id_to_wrapper=_id_to_wrapper): + del _id_to_wrapper[ident] + wr = weakref.ref(buf, callback) + _id_to_wrapper[ident] = (wrapper, wr) + return buf + +def _reduce_memoryview(m): + try: + wrapper, wr = _id_to_wrapper[id(m)] + except KeyError: + return m.__reduce__() + return _rebuild_memoryview, (wrapper, m.format) + +ForkingPickler.register(memoryview, _reduce_memoryview) + +# +# Alternatives to sharedctypes.(RawValue|Value|Array) +# + +class RawValue(object): + ''' + Shared value stored in a shared mmap + ''' + def __init__(self, format, value=None): + self._buf = RawArray(format, 1) + if value is not None: + self.value = value + + @property + def value(self): + return self._buf[0] + + @value.setter + def value(self, value): + self._buf[0] = value + + +def Value(format, value=None, *, lock=True): + ''' + Return a synchronization wrapper for a RawValue + ''' + obj = RawValue(format, value) + if lock == False: + return obj + if lock in (True, None): + lock = RLock() + if not hasattr(lock, 'acquire'): + raise AttributeError("'%r' has no method 'acquire'" % lock) + return SynchronizedValue(obj, lock) + + +def Array(format, size_or_initializer, *, lock=True): + ''' + Return a synchronization wrapper for a RawArray + ''' + obj = RawArray(format, size_or_initializer) + if lock == False: + return obj + if lock in (True, None): + lock = RLock() + if not hasattr(lock, 'acquire'): + raise AttributeError("'%r' has no method 'acquire'" % lock) + return SynchronizedArray(obj, lock) + +# +# Synchronized wrapper classes +# + +class SynchronizedBase(object): + + def __init__(self, obj, lock=None): + self._obj = obj + self._lock = lock or RLock() + + def __reduce__(self): + assert_spawning(self) + return type(self), (self._obj, self._lock) + + def get_obj(self): + return self._obj + + def get_lock(self): + return self._lock + + def __repr__(self): + return '<%s wrapper for %s>' % (type(self).__name__, self._obj) + + +class SynchronizedValue(SynchronizedBase): + @property + def value(self): + with self._lock: + return self._obj.value + @value.setter + def value(self, value): + with self._lock: + self._obj.value = value + + +class SynchronizedArray(SynchronizedBase): + def __len__(self): + return len(self._obj) + def __getitem__(self, i): + with self._lock: + return self._obj[i] + def __setitem__(self, i, value): + with self._lock: + self._obj[i] = value diff -r b31ea7995214 Lib/multiprocessing/sharedctypes.py --- a/Lib/multiprocessing/sharedctypes.py Tue May 29 12:01:47 2012 +0100 +++ b/Lib/multiprocessing/sharedctypes.py Tue May 29 15:28:59 2012 +0100 @@ -95,14 +95,15 @@ return new_obj def synchronized(obj, lock=None): - assert not isinstance(obj, SynchronizedBase), 'object already synchronized' + assert not isinstance(obj, heap.SynchronizedBase), \ + 'object already synchronized' if isinstance(obj, ctypes._SimpleCData): - return Synchronized(obj, lock) + return heap.SynchronizedValue(obj, lock) elif isinstance(obj, ctypes.Array): if obj._type_ is ctypes.c_char: return SynchronizedString(obj, lock) - return SynchronizedArray(obj, lock) + return heap.SynchronizedArray(obj, lock) else: cls = type(obj) try: @@ -110,8 +111,12 @@ except KeyError: names = [field[0] for field in cls._fields_] d = dict((name, make_property(name)) for name in names) + def __reduce__(self): + return synchronized, (self._obj, self._lock) + d['__reduce__'] = __reduce__ classname = 'Synchronized' + cls.__name__ - scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) + scls = class_cache[cls] = type(classname, + (heap.SynchronizedBase,), d) return scls(obj, lock) # @@ -149,17 +154,11 @@ template = ''' def get%s(self): - self.acquire() - try: + with self._lock: return self._obj.%s - finally: - self.release() def set%s(self, value): - self.acquire() - try: + with self._lock: self._obj.%s = value - finally: - self.release() %s = property(get%s, set%s) ''' @@ -167,69 +166,9 @@ class_cache = weakref.WeakKeyDictionary() # -# Synchronized wrappers +# # -class SynchronizedBase(object): - - def __init__(self, obj, lock=None): - self._obj = obj - self._lock = lock or RLock() - self.acquire = self._lock.acquire - self.release = self._lock.release - - def __reduce__(self): - assert_spawning(self) - return synchronized, (self._obj, self._lock) - - def get_obj(self): - return self._obj - - def get_lock(self): - return self._lock - - def __repr__(self): - return '<%s wrapper for %s>' % (type(self).__name__, self._obj) - - -class Synchronized(SynchronizedBase): - value = make_property('value') - - -class SynchronizedArray(SynchronizedBase): - - def __len__(self): - return len(self._obj) - - def __getitem__(self, i): - self.acquire() - try: - return self._obj[i] - finally: - self.release() - - def __setitem__(self, i, value): - self.acquire() - try: - self._obj[i] = value - finally: - self.release() - - def __getslice__(self, start, stop): - self.acquire() - try: - return self._obj[start:stop] - finally: - self.release() - - def __setslice__(self, start, stop, values): - self.acquire() - try: - self._obj[start:stop] = values - finally: - self.release() - - -class SynchronizedString(SynchronizedArray): +class SynchronizedString(heap.SynchronizedArray): value = make_property('value') raw = make_property('raw') diff -r b31ea7995214 Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Tue May 29 12:01:47 2012 +0100 +++ b/Lib/test/test_multiprocessing.py Tue May 29 15:28:59 2012 +0100 @@ -1002,7 +1002,17 @@ # # -class _TestValue(BaseTestCase): +class BaseSharedHeap: + shared = multiprocessing.heap + +class BaseSharedCtypes: + shared = multiprocessing + +# +# +# + +class BaseTestValue(BaseTestCase): ALLOWED_TYPES = ('processes',) @@ -1013,22 +1023,17 @@ ('c', latin('x'), latin('y')) ] - def setUp(self): - if not HAS_SHAREDCTYPES: - self.skipTest("requires multiprocessing.sharedctypes") - @classmethod def _test(cls, values): for sv, cv in zip(values, cls.codes_values): sv.value = cv[2] - def test_value(self, raw=False): if raw: - values = [self.RawValue(code, value) + values = [self.shared.RawValue(code, value) for code, value, _ in self.codes_values] else: - values = [self.Value(code, value) + values = [self.shared.Value(code, value) for code, value, _ in self.codes_values] for sv, cv in zip(values, self.codes_values): @@ -1046,32 +1051,44 @@ self.test_value(raw=True) def test_getobj_getlock(self): - val1 = self.Value('i', 5) + val1 = self.shared.Value('i', 5) lock1 = val1.get_lock() obj1 = val1.get_obj() - val2 = self.Value('i', 5, lock=None) + val2 = self.shared.Value('i', 5, lock=None) lock2 = val2.get_lock() obj2 = val2.get_obj() lock = self.Lock() - val3 = self.Value('i', 5, lock=lock) + val3 = self.shared.Value('i', 5, lock=lock) lock3 = val3.get_lock() obj3 = val3.get_obj() self.assertEqual(lock, lock3) - arr4 = self.Value('i', 5, lock=False) + arr4 = self.shared.Value('i', 5, lock=False) self.assertFalse(hasattr(arr4, 'get_lock')) self.assertFalse(hasattr(arr4, 'get_obj')) - self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') - - arr5 = self.RawValue('i', 5) + self.assertRaises(AttributeError, self.shared.Value, + 'i', 5, lock='navalue') + + arr5 = self.shared.RawValue('i', 5) self.assertFalse(hasattr(arr5, 'get_lock')) self.assertFalse(hasattr(arr5, 'get_obj')) -class _TestArray(BaseTestCase): +@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') +class _TestSharedCtypesValue(BaseTestValue, BaseSharedCtypes): + pass + +class _TestHeapValue(BaseTestValue, BaseSharedHeap): + pass + +# +# +# + +class BaseTestArray(BaseTestCase): ALLOWED_TYPES = ('processes',) @@ -1080,13 +1097,12 @@ for i in range(1, len(seq)): seq[i] += seq[i-1] - @unittest.skipIf(c_int is None, "requires _ctypes") def test_array(self, raw=False): seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] if raw: - arr = self.RawArray('i', seq) + arr = self.shared.RawArray('i', seq) else: - arr = self.Array('i', seq) + arr = self.shared.Array('i', seq) self.assertEqual(len(arr), len(seq)) self.assertEqual(arr[3], seq[3]) @@ -1113,10 +1129,10 @@ # of previously allocated non-zero memory being used for the new array # on the 2nd and 3rd loops. for _ in range(3): - arr = self.Array('i', size) + arr = self.shared.Array('i', size) self.assertEqual(len(arr), size) self.assertEqual(list(arr), [0] * size) - arr[:] = range(10) + arr[:] = array.array('i', range(10)) self.assertEqual(list(arr), list(range(10))) del arr @@ -1126,30 +1142,37 @@ @unittest.skipIf(c_int is None, "requires _ctypes") def test_getobj_getlock_obj(self): - arr1 = self.Array('i', list(range(10))) + arr1 = self.shared.Array('i', list(range(10))) lock1 = arr1.get_lock() obj1 = arr1.get_obj() - arr2 = self.Array('i', list(range(10)), lock=None) + arr2 = self.shared.Array('i', list(range(10)), lock=None) lock2 = arr2.get_lock() obj2 = arr2.get_obj() lock = self.Lock() - arr3 = self.Array('i', list(range(10)), lock=lock) + arr3 = self.shared.Array('i', list(range(10)), lock=lock) lock3 = arr3.get_lock() obj3 = arr3.get_obj() self.assertEqual(lock, lock3) - arr4 = self.Array('i', range(10), lock=False) + arr4 = self.shared.Array('i', range(10), lock=False) self.assertFalse(hasattr(arr4, 'get_lock')) self.assertFalse(hasattr(arr4, 'get_obj')) self.assertRaises(AttributeError, - self.Array, 'i', range(10), lock='notalock') - - arr5 = self.RawArray('i', range(10)) + self.shared.Array, 'i', range(10), lock='notalock') + + arr5 = self.shared.RawArray('i', range(10)) self.assertFalse(hasattr(arr5, 'get_lock')) self.assertFalse(hasattr(arr5, 'get_obj')) +@unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') +class _TestSharedCtypesArray(BaseTestArray, BaseSharedCtypes): + pass + +class _TestHeapArray(BaseTestArray, BaseSharedHeap): + pass + # # #