Index: Lib/test/lock_tests.py =================================================================== --- Lib/test/lock_tests.py (revision 79534) +++ Lib/test/lock_tests.py (working copy) @@ -234,6 +234,102 @@ self.assertFalse(lock._is_owned()) +class RWLockTests(RLockTests): + """ + Tests for RWLock objects + """ + def test_many_readers(self): + lock = self.locktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.rdlocked(): + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_rdrecursion(self): + lock = self.locktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.rdlocked(): + with lock.rdlocked(): + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertTrue(max(nlocked) > 1) + + def test_wrrecursion(self): + lock = self.locktype() + N = 5 + locked = [] + nlocked = [] + def f(): + with lock.wrlocked(): + with lock.rdlocked(): + locked.append(1) + _wait() + nlocked.append(len(locked)) + _wait() + locked.pop(-1) + Bunch(f, N).wait_for_finished() + self.assertEqual(max(nlocked), 1) + + def test_wrrecursionfail(self): + lock = self.locktype() + N = 5 + locked = [] + def f(): + with lock.rdlocked(): + self.assertRaises(RuntimeError, lock.wrlock) + locked.append(1) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(locked), N) + + def test_readers_writers(self): + lock = self.locktype() + N = 5 + rlocked = [] + wlocked = [] + nlocked = [] + def r(): + with lock.rdlocked(): + rlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + rlocked.pop(-1) + def w(): + with lock.wrlocked(): + wlocked.append(1) + _wait() + nlocked.append((len(rlocked), len(wlocked))) + _wait() + wlocked.pop(-1) + b1 = Bunch(r, N) + b2 = Bunch(w, N) + b1.wait_for_finished() + b2.wait_for_finished() + r, w, = zip(*nlocked) + self.assertTrue(max(r) > 1) + self.assertEqual(max(w), 1) + for r, w in nlocked: + if w: + self.assertEqual(r, 0) + if r: + self.assertEqual(w, 0) + + class EventTests(BaseTestCase): """ Tests for Event objects. Index: Lib/test/test_threading.py =================================================================== --- Lib/test/test_threading.py (revision 79534) +++ Lib/test/test_threading.py (working copy) @@ -534,7 +534,20 @@ class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): semtype = staticmethod(threading.BoundedSemaphore) +class RWLockTests(lock_tests.RWLockTests): + locktype = staticmethod(threading.RWLock) +class RWConditionTests(lock_tests.ConditionTests): + def condtype(self, lock=None): + if lock: + return threading.Condition(lock) + return threading.Condition(threading.RWLock()) + +class RWConditionAsRLockTests(lock_tests.RLockTests): + def locktype(self): + return threading.Condition(threading.RWLock()) + + def test_main(): test.test_support.run_unittest(LockTests, RLockTests, EventTests, ConditionAsRLockTests, ConditionTests, @@ -542,6 +555,8 @@ ThreadTests, ThreadJoinOnShutdown, ThreadingExceptionTests, + RWLockTests, RWConditionTests, + RWConditionAsRLockTests, ) if __name__ == "__main__": Index: Lib/threading.py =================================================================== --- Lib/threading.py (revision 79534) +++ Lib/threading.py (working copy) @@ -396,6 +396,153 @@ finally: self.__cond.release() + +def RWLock(*args, **kwargs): + return _RWLock(*args, **kwargs) + +class _RWLock(object): + """ + A Reader-Writer lock. Allows multiple readers at the same time but + only one Writer (with recursion). + Writers have priority over readers. + A RWLock is reentrant in a limited fashion: A thread holding the lock + can always get another read lock. And a thread holding a write lock can get another + write lock. But in general, a thread holding a read lock cannot recursively acquire a write lock. + Of course, any recursive lock (rdlock, or wrlock) must be mathced with an release. + """ + def __init__(self): + self.lock = Lock() + self.rcond = Condition(self.lock) + self.wcond = Condition(self.lock) + self.nr = self.nw = 0 #number of waiting threads + self.state = 0 #positive is readercount, negative writer count + self.owning = [] #threads will be few, so a list is not inefficient + + def wrlock(self, blocking=True): + """ + Get a Write lock + """ + me = _get_ident() + with self.lock: + while not self._wrlock(me): + if not blocking: + return False + self.nw += 1 + self.wcond.wait() + self.nw -= 1 + return True + + def _wrlock(self, me): + #we can only take the write lock if no one is there, or we already hold the lock + if self.state == 0 or (self.state < 0 and me in self.owning): + self.state -= 1 + self.owning.append(me) + return True + if self.state > 0 and me in self.owning: + raise RuntimeError("cannot recursively wrlock a rdlocked lock") + return False + + def rdlock(self, blocking=True): + """ + Read lock the lock + """ + me = _get_ident() + with self.lock: + while not self._rdlock(me): + if not blocking: + return False + #keep track of the number of readers waiting to limit + #the number of notify_all() calls required. + self.nr += 1 + self.rcond.wait() + self.nr -= 1 + return True + + def _rdlock(self, me): + if self.state < 0: + #we are write locked. See if we reacquire + return self._wrlock(me) + + if not self.nw: + ok = True #no writers waiting for the lock + else: + #there is a writer waiting, but still, allow for recursion + ok = me in self.owning + + if ok: + self.state += 1 + self.owning.append(me) + return True + return False + + def unlock(self): + """ + Release the lock + """ + me = _get_ident() + with self.lock: + try: + self.owning.remove(me) + except ValueError: + raise RuntimeError("cannot release un-acquired lock") + + if self.state > 0: + self.state -= 1 + else: + self.state += 1 + if not self.state: + if self.nw: + self.wcond.notify() + elif self.nr: + self.rcond.notify_all() + + #acquire/release api, for RLock compatibility + acquire = wrlock + release = unlock + + #context manager, gets a write lock + def __enter__(self): + self.wrlock() + def __exit__(self, e, v, tb): + self.unlock() + + #A separate ctxt mgr for read locking + class _RdLockContext(object): + def __init__(self, lock): + self.lock = lock + def __enter__(self): + self.lock.rdlock() + def __exit__(self, e, v, tb): + self.lock.unlock() + def rdlocked(self): + return self._RdLockContext(self) + def wrlocked(self): + return self #for symmetry + + #interface for condition variable. Must hold a Write lock + def _is_owned(self): + return self.state < 0 and _get_ident() in self.owning + + def _release_save(self): + #in a write locked state, get the recursion level and free the lock + with self.lock: + r = self.owning + self.owning = [] + self.state = 0 + if self.nw: + self.wcond.notify() + elif self.nr: + self.rcond.notify_all() + return r + + def _acquire_restore(self, x): + #reclaim the lock at the old recursion level + self.wrlock() + with self.lock: + self.owning = x + self.state = -len(x) + + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"):