diff -r 366c19bb5ca2 Lib/multiprocessing/__init__.py --- a/Lib/multiprocessing/__init__.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/multiprocessing/__init__.py Thu Oct 04 18:22:10 2012 +0100 @@ -22,7 +22,7 @@ 'Process', 'current_process', 'active_children', 'freeze_support', 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', + 'Lock', 'RLock', 'RWLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] @@ -193,6 +193,13 @@ from multiprocessing.synchronize import Barrier return Barrier(parties, action, timeout) +def RWLock(): + ''' + Returns a RWLock object + ''' + from multiprocessing.synchronize import RWLock + return RWLock() + def Queue(maxsize=0): ''' Returns a queue object diff -r 366c19bb5ca2 Lib/multiprocessing/dummy/__init__.py --- a/Lib/multiprocessing/dummy/__init__.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/multiprocessing/dummy/__init__.py Thu Oct 04 18:22:10 2012 +0100 @@ -35,7 +35,7 @@ __all__ = [ 'Process', 'current_process', 'active_children', 'freeze_support', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' + 'Event', 'Barrier', 'RWLock', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' ] # @@ -49,7 +49,7 @@ from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event, Condition, Barrier +from threading import Event, Condition, Barrier, RWLock from queue import Queue # diff -r 366c19bb5ca2 Lib/multiprocessing/managers.py --- a/Lib/multiprocessing/managers.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/multiprocessing/managers.py Thu Oct 04 18:22:10 2012 +0100 @@ -1016,6 +1016,29 @@ return self._callmethod('__getattribute__', ('broken',)) +class _RWLockCoreProxy(BaseProxy): + _exposed_ = ('acquire_read', 'acquire_write', 'release', '_is_owned', + '_release_save', '_acquire_restore') + def acquire_read(self, timeout=None): + return self._callmethod('acquire_read', (timeout,)) + def acquire_write(self, timeout=None): + return self._callmethod('acquire_write', (timeout,)) + def release(self, timeout=None): + return self._callmethod('release') + def _is_owned(self): + return self._callmethod('_is_owned') + def _release_save(self): + return self._callmethod('_release_save') + def _acquire_restore(self, x): + return self._callmethod('_acquire_restore', (x,)) + +class RWLockProxyWrapper(threading.RWLock): + core = None + def __init__(self, core): + self._reader_lock = threading._ReaderLock(core) + self._writer_lock = threading._WriterLock(core) + + class NamespaceProxy(BaseProxy): _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') def __getattr__(self, key): @@ -1118,3 +1141,10 @@ # types returned by methods of PoolProxy SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) SyncManager.register('AsyncResult', create_method=False) + +# SyncManager.RWLock() returns an instance of RWLock with a proxied core +SyncManager.register('_RWLockCore', threading._RWLockCore, _RWLockCoreProxy) +def RWLock(self): + core = self._RWLockCore() + return RWLockProxyWrapper(core) +SyncManager.RWLock = RWLock diff -r 366c19bb5ca2 Lib/multiprocessing/synchronize.py --- a/Lib/multiprocessing/synchronize.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/multiprocessing/synchronize.py Thu Oct 04 18:22:10 2012 +0100 @@ -18,6 +18,7 @@ from multiprocessing.process import current_process from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from multiprocessing.sharedctypes import RawValue from time import time as _time # Try to import the mp.synchronize module cleanly, if it fails @@ -373,3 +374,47 @@ @_count.setter def _count(self, value): self._array[1] = value + +# +# RWLock +# + +# This lock reuses the threading.RWLockCore, replacing important internal variables +# with process-shared ones. +class _RWLockCore(threading._RWLockCore): + # Overwrite condition and state with shared variables. + # The internal 'owning' does not need to be shared, since + # it is used only to test the presence of the 'current' thread + # in it, and thus needs be process local only. + def __init__(self): + self._state = RawValue("i", 0) + self._waiting = RawValue("i", 0) + super().__init__() + self.cond = Condition() + if sys.platform != 'win32': + # After a fork a new thread might be created with an id + # which was in self._owners at the time of the fork. + # Therefore we clear self.owning. + register_after_fork(self, _RWLockCore._clear_owning) + + @staticmethod + def _clear_owning(lock): + lock.owning[:] = [] + + @property + def state(self): + return self._state.value + @state.setter + def state(self, value): + self._state.value = value + + @property + def waiting(self): + return self._waiting.value + @waiting.setter + def waiting(self, value): + self._waiting.value = value + +#Specialization, instantiating our special process-safe RWLockCore +class RWLock(threading.RWLock): + core = _RWLockCore diff -r 366c19bb5ca2 Lib/test/lock_tests.py --- a/Lib/test/lock_tests.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/test/lock_tests.py Thu Oct 04 18:22:10 2012 +0100 @@ -874,3 +874,127 @@ b = self.barriertype(1) b.wait() b.wait() + +class RWLockTests(RLockTests): + """ + Tests for RWLock objects + """ + def test_many_readers(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_reader_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.reader_lock: + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_writer_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.writer_lock: + with lock.writer_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertEqual(max(nlocked), 1) + + def test_writer_recursionfail(self): + lock = self.rwlocktype() + N = 5 + locked = [] + def f(): + with lock.reader_lock: + self.assertRaises(RuntimeError, lock.writer_lock.acquire) + locked.append(1) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(locked), N) + + def test_readers_writers(self): + lock = self.rwlocktype() + N = 5 + rlocked = [] + wlocked = [] + nlocked = [] + def r(): + with lock.reader_lock: + rlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + rlocked.pop(-1) + def w(): + with lock.writer_lock: + wlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + wlocked.pop(-1) + b1 = Bunch(r, N) + b2 = Bunch(w, N) + b1.wait_for_finished() + b2.wait_for_finished() + r, w, = zip(*nlocked) + self.assertTrue(max(r) > 1) + self.assertEqual(max(w), 1) + for r, w in nlocked: + if w: + self.assertEqual(r, 0) + if r: + self.assertEqual(w, 0) + + def test_writer_success(self): + """Verify that a writer can get access""" + lock = self.rwlocktype() + N = 5 + reads = 0 + writes = 0 + def r(): + # read until we achive write successes + nonlocal reads, writes + while writes < 2: + with lock.reader_lock: + reads += 1 + def w(): + nonlocal reads, writes + while reads == 0: + _wait() + for i in range(2): + _wait() + with lock.writer_lock: + writes += 1 + + b1 = Bunch(r, N) + b2 = Bunch(w, 1) + b1.wait_for_finished() + b2.wait_for_finished() + self.assertEqual(writes, 2) + # uncomment this to view performance + #print(writes, reads) diff -r 366c19bb5ca2 Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/test/test_multiprocessing.py Thu Oct 04 18:22:10 2012 +0100 @@ -1090,6 +1090,20 @@ with self._lock: return self._lengthbuf[0] + def pop(self, where=-1): + with self._lock: + self._lengthbuf[0] -= 1 + return 0 + +class _MaxList(_DummyList): + """This list is for storing the max value appended to it""" + def append(self, val): + with self._lock: + self._lengthbuf[0] = max(val, self._lengthbuf[0]) + + def __iter__(self): + yield self._lengthbuf[0] + def _wait(): # A crude wait/yield function not relying on synchronization primitives. time.sleep(0.01) @@ -1396,6 +1410,182 @@ # # +class _TestRWLock(BaseTestCase): + """ + Tests for RWLock objects + """ + def rwlocktype(self): + return self.RWLock() + + def DummyList(self): + if self.TYPE == 'threads': + return [] + elif self.TYPE == 'manager': + return self.manager.list() + else: + return _DummyList() + + def MaxList(self): + if self.TYPE == 'threads': + return [] + elif self.TYPE == 'manager': + return self.manager.list() + else: + return _MaxList() + + def Bunch(self, f, n, args=()): + return Bunch(self, f, args, n) + + # must define these functions here. Closures are not supported for pickling. + @classmethod + def f1(self, lock, locked, nlocked): + with lock.reader_lock: + locked.append(1) + time.sleep(0.1) + nlocked.append(len(locked)) + time.sleep(0.1) + locked.pop(-1) + + def test_many_readers(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + with lock.writer_lock: + bunch = self.Bunch(self.f1, N, (lock, locked, nlocked)) + # wait till all reader processes waiting for lock + time.sleep(1) + bunch.wait_for_finished() + self.assertEqual(max(nlocked), N) + + @classmethod + def f2(self, lock, locked, nlocked): + with lock.reader_lock: + with lock.reader_lock: + locked.append(1) + time.sleep(0.1) + nlocked.append(len(locked)) + time.sleep(0.1) + locked.pop(-1) + + def test_reader_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + with lock.writer_lock: + bunch = self.Bunch(self.f2, N, (lock, locked, nlocked)) + # wait till all reader processes waiting for lock + time.sleep(1) + bunch.wait_for_finished() + self.assertEqual(max(nlocked), N) + + @classmethod + def f3(self, lock, locked, nlocked): + with lock.writer_lock: + with lock.writer_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + + def test_writer_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + self.Bunch(self.f3, N, (lock, locked, nlocked)).wait_for_finished() + self.assertEqual(max(nlocked), 1) + + @classmethod + def f4(self, lock, locked): + with lock.reader_lock: + self.assertRaises(RuntimeError, lock.writer_lock.acquire) + locked.append(1) + + def test_writer_recursionfail(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + self.Bunch(self.f4, N, (lock, locked)).wait_for_finished() + self.assertEqual(len(locked), N) + + @classmethod + def r5(self, lock, rlocked, wlocked, nlocked): + with lock.reader_lock: + rlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + rlocked.pop(-1) + + @classmethod + def w5(self, lock, rlocked, wlocked, nlocked): + with lock.writer_lock: + wlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + wlocked.pop(-1) + + def test_readers_writers(self): + lock = self.rwlocktype() + N = 5 + rlocked = self.DummyList() + wlocked = self.DummyList() + nlocked = self.DummyList() + b1 = self.Bunch(self.r5, N, (lock, rlocked, wlocked, nlocked)) + b2 = self.Bunch(self.w5, N, (lock, rlocked, wlocked, nlocked)) + b1.wait_for_finished() + b2.wait_for_finished() + return #the lists aren't real, can't really iterate over them. + r, w, = zip(*nlocked) + self.assertTrue(max(r) > 1) + self.assertEqual(max(w), 1) + for r, w in nlocked: + if w: + self.assertEqual(r, 0) + if r: + self.assertEqual(w, 0) + + @classmethod + def r6(self, lock, reads, writes): + # read until we achive write successes + while len(writes) < 2: + with lock.reader_lock: + reads.append(1) + + @classmethod + def w6(self, lock, reads, writes): + while len(reads) == 0: + _wait() + for i in range(2): + _wait() + with lock.writer_lock: + writes.append(1) + + def test_writer_success(self): + """Verify that a writer can get access""" + lock = self.rwlocktype() + N = 5 + reads = self.DummyList() + writes = self.DummyList() + import time + t0 = time.clock() + b1 = self.Bunch(self.r6, N, (lock, reads, writes)) + b2 = self.Bunch(self.w6, 1, (lock, reads, writes)) + b1.wait_for_finished() + b2.wait_for_finished() + print ("***************** completed in %ss"%(time.clock()-t0)) + self.assertEqual(len(writes), 2) + # uncomment this to view performance + #print(writes, reads) + +# +# +# + class _TestValue(BaseTestCase): ALLOWED_TYPES = ('processes',) @@ -2923,7 +3113,7 @@ Process = multiprocessing.Process locals().update(get_attributes(multiprocessing, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', + 'Condition', 'Event', 'Barrier', 'RWLock', 'Value', 'Array', 'RawValue', 'RawArray', 'current_process', 'active_children', 'Pipe', 'connection', 'JoinableQueue', 'Pool' ))) @@ -2938,7 +3128,7 @@ manager = object.__new__(multiprocessing.managers.SyncManager) locals().update(get_attributes(manager, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict', + 'Condition', 'Event', 'Barrier', 'RWLock', 'Value', 'Array', 'list', 'dict', 'Namespace', 'JoinableQueue', 'Pool' ))) @@ -2951,7 +3141,7 @@ Process = multiprocessing.dummy.Process locals().update(get_attributes(multiprocessing.dummy, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', + 'Condition', 'Event', 'Barrier', 'RWLock', 'Value', 'Array', 'current_process', 'active_children', 'Pipe', 'connection', 'dict', 'list', 'Namespace', 'JoinableQueue', 'Pool' ))) diff -r 366c19bb5ca2 Lib/test/test_threading.py --- a/Lib/test/test_threading.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/test/test_threading.py Thu Oct 04 18:22:10 2012 +0100 @@ -815,6 +815,22 @@ class BarrierTests(lock_tests.BarrierTests): barriertype = staticmethod(threading.Barrier) +class RWLockTests(lock_tests.RWLockTests): + rwlocktype = staticmethod(threading.RWLock) + def locktype(self): + return self.rwlocktype().writer_lock + +class RWConditionTests(lock_tests.ConditionTests): + rwlocktype = staticmethod(threading.RWLock) + def condtype(self, lock=None): + if lock: + return threading.Condition(lock) + return threading.Condition(self.rwlocktype().writer_lock) + +class RWConditionAsRLockTests(lock_tests.RLockTests): + rwlocktype = staticmethod(threading.RWLock) + def locktype(self): + return threading.Condition(self.rwlocktype().writer_lock) def test_main(): test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, @@ -824,6 +840,9 @@ ThreadJoinOnShutdown, ThreadingExceptionTests, BarrierTests, + RWLockTests, + RWConditionTests, + RWConditionAsRLockTests, ) if __name__ == "__main__": diff -r 366c19bb5ca2 Lib/threading.py --- a/Lib/threading.py Thu Oct 04 16:07:03 2012 +0200 +++ b/Lib/threading.py Thu Oct 04 18:22:10 2012 +0100 @@ -501,7 +501,171 @@ #exception raised by the Barrier class class BrokenBarrierError(RuntimeError): pass +# The internal lock object managing the RWLock state. +class _RWLockCore(object): + def __init__(self): + self.cond = Condition() + self.state = 0 # positive is shared count, negative exclusive count + self.waiting = 0 + self.owning = [] # threads will be few, so a list is not inefficient + # Acquire the lock in read mode. + def acquire_read(self, timeout=None): + with self.cond: + return self.cond.wait_for(self._acquire_read, timeout) + + def _acquire_read(self): + if self.state < 0: + # lock is in write mode. See if it is ours and we can recurse + return self._acquire_write() + + # Implement "exclusive bias" giving exclusive lock priority. + me = get_ident() + if not self.waiting: + ok = True # no exclusive acquires waiting. + else: + # Recursion must have the highest priority, otherwise we deadlock + ok = me in self.owning + + if ok: + self.state += 1 + self.owning.append(me) + return ok + + # Acquire the lock in write mode. A 'waiting' count is maintainded, + # ensurring that 'readers' will yield to writers. + def acquire_write(self, timeout=None): + with self.cond: + self.waiting += 1 + try: + return self.cond.wait_for(self._acquire_write, timeout) + finally: + self.waiting -= 1 + + def _acquire_write(self): + #we can only take the write lock if no one is there, or we already hold the lock + me = get_ident() + if self.state == 0 or (self.state < 0 and me in self.owning): + self.state -= 1 + self.owning.append(me) + return True + if self.state > 0 and me in self.owning: + raise RuntimeError("cannot upgrade RWLock from read to write") + return False + + # Release the lock + def release(self): + with self.cond: + me = get_ident() + try: + self.owning.remove(get_ident()) + except ValueError: + raise RuntimeError("cannot release an un-acquired lock") + if self.state > 0: + self.state -= 1 + else: + self.state += 1 + if self.state == 0: + self.cond.notify_all() + + # Interface for condition variable. Must hold an exclusive lock since the + # condition variable's state may be protected by the lock + def _is_owned(self): + return self.state < 0 and get_ident() in self.owning + + def _release_save(self): + # In a exlusively locked state, get the recursion level and free the lock + with self.cond: + if get_ident() not in self.owning: + raise RuntimeError("cannot release an un-acquired lock") + r = self.owning + self.owning = [] + self.state = 0 + self.cond.notify_all() + return r + + def _acquire_restore(self, x): + # Reclaim the exclusive lock at the old recursion level + self.acquire_write() + with self.cond: + self.owning = x + self.state = -len(x) + +# Lock objects to access the _RWLockCore in reader or writer mode +class _ReaderLock: + def __init__(self, lock): + self.lock = lock + + @staticmethod + def _timeout(blocking, timeout): + # A few sanity checks to satisfy the unittests. + if timeout < 0 and timeout != -1: + raise ValueError("invalid timeout") + if timeout > TIMEOUT_MAX: + raise OverflowError + if blocking: + return timeout if timeout >= 0 else None + if timeout > 0: + raise ValueError("can't specify a timeout when non-blocking") + return 0 + + def acquire(self, blocking=True, timeout=-1): + return self.lock.acquire_read(self._timeout(blocking, timeout)) + + def release(self): + self.lock.release() + + def __enter__(self): + self.acquire() + + def __exit__(self, exc, val, tb): + self.release() + + def _is_owned(self): + raise TypeError("a reader lock cannot be used with a Condition") + +class _WriterLock(_ReaderLock): + def acquire(self, blocking=True, timeout=-1): + return self.lock.acquire_write(self._timeout(blocking, timeout)) + + def _is_owned(self): + return self.lock._is_owned() + + def _release_save(self): + return self.lock._release_save() + + def _acquire_restore(self, arg): + return self.lock._acquire_restore(arg) + +class RWLock(): + # Doc shamelessly ripped off from Java + """ + A RWLock maintains a pair of associated locks, one for read-only operations + and one for writing. The read lock may be held simultaneously by multiple + reader threads, so long as there are no writers. The write lock is exclusive. + + """ + core = _RWLockCore + + def __init__(self): + core = self.core() + self._reader_lock = _ReaderLock(core) + self._writer_lock = _WriterLock(core) + + @property + def reader_lock(self): + """ + The lock used for read, or shared, access + """ + return self._reader_lock + + @property + def writer_lock(self): + """ + The lock used for write, or exclusive, access + """ + return self._writer_lock + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"):