diff -r 3f739f42be51 Lib/multiprocessing/__init__.py --- a/Lib/multiprocessing/__init__.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/multiprocessing/__init__.py Tue Oct 02 11:28:20 2012 +0000 @@ -22,7 +22,7 @@ 'Process', 'current_process', 'active_children', 'freeze_support', 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', + 'Lock', 'RLock', 'RWLock', 'FairRWLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] @@ -193,6 +193,20 @@ from multiprocessing.synchronize import Barrier return Barrier(parties, action, timeout) +def RWLock(): + ''' + Returns a RWLock object + ''' + from multiprocessing.synchronize import RWLock + return RWLock() + +def FairRWLock(): + ''' + Returns a RWLock object + ''' + from multiprocessing.synchronize import FairRWLock + return FairRWLock() + def Queue(maxsize=0): ''' Returns a queue object diff -r 3f739f42be51 Lib/multiprocessing/dummy/__init__.py --- a/Lib/multiprocessing/dummy/__init__.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/multiprocessing/dummy/__init__.py Tue Oct 02 11:28:20 2012 +0000 @@ -35,7 +35,7 @@ __all__ = [ 'Process', 'current_process', 'active_children', 'freeze_support', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' + 'Event', 'Barrier', 'RWLock', 'FairRWLock', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue' ] # @@ -49,7 +49,7 @@ from multiprocessing.dummy.connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore -from threading import Event, Condition, Barrier +from threading import Event, Condition, Barrier, RWLock, FairRWLock from queue import Queue # diff -r 3f739f42be51 Lib/multiprocessing/synchronize.py --- a/Lib/multiprocessing/synchronize.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/multiprocessing/synchronize.py Tue Oct 02 11:28:20 2012 +0000 @@ -18,6 +18,7 @@ from multiprocessing.process import current_process from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from multiprocessing.sharedctypes import RawValue from time import time as _time # Try to import the mp.synchronize module cleanly, if it fails @@ -373,3 +374,190 @@ @_count.setter def _count(self, value): self._array[1] = value + +# +# RWLock +# + +class RWLock(threading.RWLockBase): + def __init__(self): + self.cond = Condition() + self._state = RawValue("i", 0) + self._owning = RawValue("i", 0) + + # Thread IDs are unique across the system + @staticmethod + def get_ident(): + return threading.get_ident() + + @property + def state(self): + return self._state.value + @state.setter + def state(self, value): + self._state.value = value + + @property + def owning(self): + return self._owning.value + @owning.setter + def owning(self, value): + self._owning.value = value + + def acquire_read(self, timeout=None): + """ + Acquire the lock in shared mode + """ + with self.cond: + return self.cond.wait_for(self._acquire_read, timeout) + + def acquire_write(self, timeout=None): + """ + Acquire the lock in exclusive mode + """ + with self.cond: + return self.cond.wait_for(self._acquire_write, timeout) + + def _acquire_write(self): + #we can only take the write lock if no one is there, or we already hold the lock + me = self.get_ident() + if self.state == 0 or (self.state < 0 and me == self.owning): + self.state -= 1 + self.owning = me + return True + return False + + def _acquire_read(self): + if self.state < 0: + # lock is in write mode. See if it is ours and we can recurse + return self._acquire_write() + self.state += 1 + return True + + def release(self): + """ + Release the lock + """ + with self.cond: + if self.state == 0: + raise RuntimeError("cannot release an un-acquired lock") + if self.state > 0: + self.state -= 1 + else: + self.state += 1 + if self.state == 0: + self.owning = 0 + self.cond.notify_all() + + # Interface for condition variable. Must hold an exclusive lock since the + # condition variable's state may be protected by the lock + def _is_owned(self): + return self.state < 0 and self.get_ident() == self.owning + + def _release_save(self): + # In a exlusively locked state, get the recursion level and free the lock + with self.cond: + if self.get_ident() != self.owning: + raise RuntimeError("cannot release an un-acquired lock") + r = self.state, self.owning + self.owning = 0 + self.state = 0 + self.cond.notify_all() + return r + + def _acquire_restore(self, x): + # Reclaim the exclusive lock at the old recursion level + self.acquire_write() + with self.cond: + self.state, self.owning = x + +class FairRWLock(threading.RWLockBase): + # Provide writer priority. Note that we cannot support recursion in this case. + # To provide writer priority, with recursion, we must still allow a reader to recurse + # (otherwise we would deadlock) + # and to do that, we must hold a list of currently owning threads. + # It's simpler to disallow recursion completely. + def __init__(self): + self.cond = Condition() + self._state = RawValue("i", 0) + self._waiting = RawValue("i", 0) + + @property + def state(self): + return self._state.value + @state.setter + def state(self, value): + self._state.value = value + + @property + def waiting(self): + return self._waiting.value + @waiting.setter + def waiting(self, value): + self._waiting.value = value + + def acquire_read(self, timeout=None): + """ + Acquire the lock in shared mode + """ + with self.cond: + return self.cond.wait_for(self._acquire_read, timeout) + + def acquire_write(self, timeout=None): + """ + Acquire the lock in exclusive mode + """ + with self.cond: + self.waiting += 1 + try: + return self.cond.wait_for(self._acquire_write, timeout) + finally: + self.waiting -= 1 + + def _acquire_write(self): + #we can only take the write lock if no one is there, or we already hold the lock + if self.state == 0: + self.state -= 1 + return True + return False + + def _acquire_read(self): + if self.state >= 0 and not self.waiting: + self.state += 1 + return True + return False + + def release(self): + """ + Release the lock + """ + with self.cond: + if self.state == 0: + raise RuntimeError("cannot release an un-acquired lock") + if self.state > 0: + self.state -= 1 + else: + self.state += 1 + if self.state == 0: + self.cond.notify_all() + + # Interface for condition variable. Must hold an exclusive lock since the + # condition variable's state may be protected by the lock + def _is_owned(self): + return self.state < 0 + + def _release_save(self): + # In a exlusively locked state, get the recursion level and free the lock + with self.cond: + if self.state >= 0: + raise RuntimeError("cannot release an un-acquired lock") + r = self.state + self.state = 0 + self.cond.notify_all() + return r + + def _acquire_restore(self, x): + # Reclaim the exclusive lock at the old recursion level + self.acquire_write() + with self.cond: + self.state = x diff -r 3f739f42be51 Lib/test/lock_tests.py --- a/Lib/test/lock_tests.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/test/lock_tests.py Tue Oct 02 11:28:20 2012 +0000 @@ -874,3 +874,127 @@ b = self.barriertype(1) b.wait() b.wait() + +class RWLockTests(RLockTests): + """ + Tests for RWLock objects + """ + def test_many_readers(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_reader_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.reader_lock: + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_writer_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.writer_lock: + with lock.writer_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertEqual(max(nlocked), 1) + + def test_writer_recursionfail(self): + lock = self.rwlocktype() + N = 5 + locked = [] + def f(): + with lock.reader_lock: + self.assertRaises(RuntimeError, lock.acquire_write) + locked.append(1) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(locked), N) + + def test_readers_writers(self): + lock = self.rwlocktype() + N = 5 + rlocked = [] + wlocked = [] + nlocked = [] + def r(): + with lock.reader_lock: + rlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + rlocked.pop(-1) + def w(): + with lock.writer_lock: + wlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + wlocked.pop(-1) + b1 = Bunch(r, N) + b2 = Bunch(w, N) + b1.wait_for_finished() + b2.wait_for_finished() + r, w, = zip(*nlocked) + self.assertTrue(max(r) > 1) + self.assertEqual(max(w), 1) + for r, w in nlocked: + if w: + self.assertEqual(r, 0) + if r: + self.assertEqual(w, 0) + + def test_writer_success(self): + """Verify that a writer can get access""" + lock = self.rwlocktype() + N = 5 + reads = 0 + writes = 0 + def r(): + # read until we achive write successes + nonlocal reads, writes + while writes < 2: + with lock.reader_lock: + reads += 1 + def w(): + nonlocal reads, writes + while reads == 0: + _wait() + for i in range(2): + _wait() + with lock.writer_lock: + writes += 1 + + b1 = Bunch(r, N) + b2 = Bunch(w, 1) + b1.wait_for_finished() + b2.wait_for_finished() + self.assertEqual(writes, 2) + # uncomment this to view performance + #print(writes, reads) diff -r 3f739f42be51 Lib/test/test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/test/test_multiprocessing.py Tue Oct 02 11:28:20 2012 +0000 @@ -1090,6 +1090,20 @@ with self._lock: return self._lengthbuf[0] + def pop(self, where=-1): + with self._lock: + self._lengthbuf[0] -= 1 + return 0 + +class _MaxList(_DummyList): + """This list is for storing the max value appended to it""" + def append(self, val): + with self._lock: + self._lengthbuf[0] = max(val, self._lengthbuf[0]) + + def __iter__(self): + yield self._lengthbuf[0] + def _wait(): # A crude wait/yield function not relying on synchronization primitives. time.sleep(0.01) @@ -1396,6 +1410,189 @@ # # +class _TestRWLock(BaseTestCase): + """ + Tests for RWLock objects + """ + ALLOWED_TYPES = ('processes', 'threads') + def rwlocktype(self): + return self.RWLock() + + def DummyList(self): + if self.TYPE == 'threads': + return [] + elif self.TYPE == 'manager': + return self.manager.list() + else: + return _DummyList() + + def MaxList(self): + if self.TYPE == 'threads': + return [] + elif self.TYPE == 'manager': + return self.manager.list() + else: + return _MaxList() + + def Bunch(self, f, n, args=()): + return Bunch(self, f, args, n) + + # must define these functions here. Closures are not supported for pickling. + @classmethod + def f1(self, lock, locked, nlocked): + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + + def test_many_readers(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + self.Bunch(self.f1, N, (lock, locked, nlocked)).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + @classmethod + def f2(self, lock, locked, nlocked): + with lock.reader_lock: + with lock.reader_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + + def test_reader_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + self.Bunch(self.f2, N, (lock, locked, nlocked)).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + @classmethod + def f3(self, lock, locked, nlocked): + with lock.writer_lock: + with lock.writer_lock: + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + + def test_writer_recursion(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + nlocked = self.MaxList() + self.Bunch(self.f3, N, (lock, locked, nlocked)).wait_for_finished() + self.assertEqual(max(nlocked), 1) + + @classmethod + def f4(self, lock, locked): + with lock.reader_lock: + self.assertRaises(RuntimeError, lock.acquire_write) + locked.append(1) + + def test_writer_recursionfail(self): + lock = self.rwlocktype() + N = 5 + locked = self.DummyList() + self.Bunch(self.f4, N, (lock, locked)).wait_for_finished() + self.assertEqual(len(locked), N) + + @classmethod + def r5(self, lock, rlocked, wlocked, nlocked): + with lock.reader_lock: + rlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + rlocked.pop(-1) + + @classmethod + def w5(self, lock, rlocked, wlocked, nlocked): + with lock.writer_lock: + wlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + wlocked.pop(-1) + + def test_readers_writers(self): + lock = self.rwlocktype() + N = 5 + rlocked = self.DummyList() + wlocked = self.DummyList() + nlocked = self.DummyList() + b1 = self.Bunch(self.r5, N, (lock, rlocked, wlocked, nlocked)) + b2 = self.Bunch(self.w5, N, (lock, rlocked, wlocked, nlocked)) + b1.wait_for_finished() + b2.wait_for_finished() + return #the lists aren't real, can't really iterate over them. + r, w, = zip(*nlocked) + self.assertTrue(max(r) > 1) + self.assertEqual(max(w), 1) + for r, w in nlocked: + if w: + self.assertEqual(r, 0) + if r: + self.assertEqual(w, 0) + + @classmethod + def r6(self, lock, reads, writes): + # read until we achive write successes + while len(writes) < 2: + with lock.reader_lock: + reads.append(1) + + @classmethod + def w6(self, lock, reads, writes): + while len(reads) == 0: + _wait() + for i in range(2): + _wait() + with lock.writer_lock: + writes.append(1) + + def test_writer_success(self): + """Verify that a writer can get access""" + lock = self.rwlocktype() + N = 5 + reads = self.DummyList() + writes = self.DummyList() + import time + t0 = time.clock() + b1 = self.Bunch(self.r6, N, (lock, reads, writes)) + b2 = self.Bunch(self.w6, 1, (lock, reads, writes)) + b1.wait_for_finished() + b2.wait_for_finished() + print ("***************** completed in %ss"%(time.clock()-t0)) + self.assertEqual(len(writes), 2) + # uncomment this to view performance + #print(writes, reads) + +# +# +# + +class _TestFairRWLock(_TestRWLock): + def rwlocktype(self): + return self.FairRWLock() + + # nerf tests that don"t support recursion + def test_reader_recursion(self): + self.skipTest("FairRWLock does not support recursion") + + test_writer_recursionfail = test_writer_recursion = test_reader_recursion + +# +# +# + class _TestValue(BaseTestCase): ALLOWED_TYPES = ('processes',) @@ -2923,7 +3120,7 @@ Process = multiprocessing.Process locals().update(get_attributes(multiprocessing, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue', + 'Condition', 'Event', 'Barrier', 'RWLock', 'FairRWLock', 'Value', 'Array', 'RawValue', 'RawArray', 'current_process', 'active_children', 'Pipe', 'connection', 'JoinableQueue', 'Pool' ))) @@ -2951,7 +3148,7 @@ Process = multiprocessing.dummy.Process locals().update(get_attributes(multiprocessing.dummy, ( 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', - 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process', + 'Condition', 'Event', 'Barrier', 'RWLock', 'FairRWLock', 'Value', 'Array', 'current_process', 'active_children', 'Pipe', 'connection', 'dict', 'list', 'Namespace', 'JoinableQueue', 'Pool' ))) diff -r 3f739f42be51 Lib/test/test_threading.py --- a/Lib/test/test_threading.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/test/test_threading.py Tue Oct 02 11:28:20 2012 +0000 @@ -815,6 +815,28 @@ class BarrierTests(lock_tests.BarrierTests): barriertype = staticmethod(threading.Barrier) +class RWLockTests(lock_tests.RWLockTests): + rwlocktype = staticmethod(threading.RWLock) + def locktype(self): + return self.rwlocktype().writer_lock + +class RWConditionTests(lock_tests.ConditionTests): + rwlocktype = staticmethod(threading.RWLock) + def condtype(self, lock=None): + if lock: + return threading.Condition(lock) + return threading.Condition(self.rwlocktype().writer_lock) + +class RWConditionAsRLockTests(lock_tests.RLockTests): + rwlocktype = staticmethod(threading.RWLock) + def locktype(self): + return threading.Condition(self.rwlocktype().writer_lock) + +class FairRWLockMixin: + rwlocktype = staticmethod(threading.FairRWLock) +class FairRWLockTests(FairRWLockMixin, RWLockTests): pass +class FairRWConditionTests(FairRWLockMixin, RWConditionTests): pass +class FairRWConditionAsRLockTests(FairRWLockMixin, RWConditionAsRLockTests): pass def test_main(): test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, @@ -824,6 +846,12 @@ ThreadJoinOnShutdown, ThreadingExceptionTests, BarrierTests, + RWLockTests, + RWConditionTests, + RWConditionAsRLockTests, + FairRWLockTests, + FairRWConditionTests, + FairRWConditionAsRLockTests, ) if __name__ == "__main__": diff -r 3f739f42be51 Lib/threading.py --- a/Lib/threading.py Mon Oct 01 07:18:55 2012 +0300 +++ b/Lib/threading.py Tue Oct 02 11:28:20 2012 +0000 @@ -501,6 +501,192 @@ #exception raised by the Barrier class class BrokenBarrierError(RuntimeError): pass +class RWLockBase(object): + """ + A Reader/Writer lock, that can be acquired either in reader or writer mode. + In reader mode, the lock may be held by any number of threads. + In writer mode, only one thread can hold the lock. + A RWLock is reentrant in a limited fashion: A thread holding the lock + can always get another read lock. And a thread holding an write lock + can get another lock (read or write.) + But in general, a thread holding a read lock cannot recursively acquire an + write lock. + Any acquire_read() or acquire_write must be matched with a release(). + """ + + # Proxy classes that do either read or write locking + class _ReaderLock: + def __init__(self, lock): + self.lock = lock + + @staticmethod + def _timeout(blocking, timeout): + # A few sanity checks to satisfy the unittests. + if timeout < 0 and timeout != -1: + raise ValueError("invalid timeout") + if timeout > TIMEOUT_MAX: + raise OverflowError + if blocking: + return timeout if timeout >= 0 else None + if timeout > 0: + raise ValueError("can't specify a timeout when non-blocking") + return 0 + + def acquire(self, blocking=True, timeout=-1): + return self.lock.acquire_read(self._timeout(blocking, timeout)) + + def release(self): + self.lock.release() + + def __enter__(self): + self.acquire() + + def __exit__(self, exc, val, tb): + self.release() + + def _is_owned(self): + raise TypeError("a reader lock cannot be used with a Condition") + + class _WriterLock(_ReaderLock): + def acquire(self, blocking=True, timeout=-1): + return self.lock.acquire_write(self._timeout(blocking, timeout)) + + def _is_owned(self): + return self.lock._is_owned() + + def _release_save(self): + return self.lock._release_save() + + def _acquire_restore(self, arg): + return self.lock._acquire_restore(arg) + + @property + def reader_lock(self): + """ + Return a proxy object that acquires and releases the lock in read mode + """ + return self._ReaderLock(self) + + @property + def writer_lock(self): + """ + Return a proxy object that acquires and releases the lock in write mode + """ + return self._WriterLock(self) + +class RWLock(RWLockBase): + def __init__(self): + self.cond = Condition() + self.state = 0 # positive is shared count, negative exclusive count + self.owning = [] # threads will be few, so a list is not inefficient + + def acquire_read(self, timeout=None): + """ + Acquire the lock in shared mode + """ + with self.cond: + return self.cond.wait_for(self._acquire_read, timeout) + + def acquire_write(self, timeout=None): + """ + Acquire the lock in exclusive mode + """ + with self.cond: + return self.cond.wait_for(self._acquire_write, timeout) + + def release(self): + """ + Release the lock + """ + with self.cond: + me = get_ident() + try: + self.owning.remove(get_ident()) + except ValueError: + raise RuntimeError("cannot release an un-acquired lock") + if self.state > 0: + self.state -= 1 + else: + self.state += 1 + if self.state == 0: + self.cond.notify_all() + + def _acquire_write(self): + #we can only take the write lock if no one is there, or we already hold the lock + me = get_ident() + if self.state == 0 or (self.state < 0 and me in self.owning): + self.state -= 1 + self.owning.append(me) + return True + if self.state > 0 and me in self.owning: + raise RuntimeError("cannot upgrade RWLock from read to write") + return False + + def _acquire_read(self): + if self.state < 0: + # lock is in write mode. See if it is ours and we can recurse + return self._acquire_write() + self.state += 1 + self.owning.append(get_ident()) + return True + + # Interface for condition variable. Must hold an exclusive lock since the + # condition variable's state may be protected by the lock + def _is_owned(self): + return self.state < 0 and get_ident() in self.owning + + def _release_save(self): + # In a exlusively locked state, get the recursion level and free the lock + with self.cond: + if get_ident() not in self.owning: + raise RuntimeError("cannot release an un-acquired lock") + r = self.owning + self.owning = [] + self.state = 0 + self.cond.notify_all() + return r + + def _acquire_restore(self, x): + # Reclaim the exclusive lock at the old recursion level + self.acquire_write() + with self.cond: + self.owning = x + self.state = -len(x) + +# A specialization giving writer priority +class FairRWLock(RWLock): + def __init__(self): + super(FairRWLock, self).__init__() + self.waiting = 0 + + def acquire_write(self, timeout=None): + """ + Acquire the lock in exclusive mode + """ + with self.cond: + self.waiting += 1 + try: + return self.cond.wait_for(self._acquire_write, timeout) + finally: + self.waiting -= 1 + + def _acquire_read(self): + if self.state < 0: + # lock is in write mode. See if it is ours and we can recurse + return self._acquire_write() + + # Implement "exclusive bias" giving exclusive lock priority. + me = get_ident() + if not self.waiting: + ok = True # no exclusive acquires waiting. + else: + # Recursion must have the highest priority + ok = me in self.owning + + if ok: + self.state += 1 + self.owning.append(me) + return ok # Helper to generate new thread names _counter = 0