From 8eda9ea38da9fc6ffd38f511827126e3c426dc07 Mon Sep 17 00:00:00 2001 From: Sebastian Noack Date: Sun, 30 Sep 2012 02:56:11 +0200 Subject: [PATCH] Added ShrdExclLock to threading and multiprocessing module. --- Lib/_synchronize.py | 57 ++++++++++++++++++++++++++++++++++++ Lib/multiprocessing/__init__.py | 7 +++++ Lib/multiprocessing/synchronize.py | 11 ++++++- Lib/threading.py | 8 ++++- 4 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 Lib/_synchronize.py diff --git a/Lib/_synchronize.py b/Lib/_synchronize.py new file mode 100644 index 0000000..ff5f27f --- /dev/null +++ b/Lib/_synchronize.py @@ -0,0 +1,57 @@ +class _ShrdExclLock(object): + def __init__(self, cond, count, num_acq_lock, num_got_lock, requirement): + self._cond = cond + self._count = count + self._num_acq_lock = num_acq_lock + self._num_got_lock = num_got_lock + self._requirement = requirement + + def acquire(self, blocking=True): + with self._cond: + waitno = self._num_acq_lock.value + self._num_acq_lock.value = waitno + 1 + + try: + while waitno != self._num_got_lock.value or not self._requirement(): + if not blocking: + return False + self._cond.wait() + finally: + self._num_got_lock.value += 1 + self._cond.notify_all() + + self._count.value += 1 + + return True + + def release(self): + with self._cond: + if self._count.value == 0: + raise ValueError('lock released to many times') + self._count.value -= 1 + self._cond.notify_all() + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, tb): + self.release() + +def make_shrd_excl_lock(Condition, c_uint): + def ShrdExclLock(): + cond = Condition() + + shrd_count = c_uint(0) + excl_count = c_uint(0) + + num_acq_lock = c_uint(0) + num_got_lock = c_uint(0) + + can_acquire_shrd_lock = lambda : 0 == excl_count.value + can_acquire_excl_lock = lambda : 0 == excl_count.value == shrd_count.value + + return (_ShrdExclLock(cond, shrd_count, num_acq_lock, num_got_lock, can_acquire_shrd_lock), + _ShrdExclLock(cond, excl_count, num_acq_lock, num_got_lock, can_acquire_excl_lock)) + + return ShrdExclLock diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 1f3e67c..934dc0f 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -193,6 +193,13 @@ def Barrier(parties, action=None, timeout=None): from multiprocessing.synchronize import Barrier return Barrier(parties, action, timeout) +def ShrdExclLock(): + ''' + Returns a shared and exclusive lock object + ''' + from multiprocessing.synchronize import ShrdExclLock + return ShrdExclLock() + def Queue(maxsize=0): ''' Returns a queue object diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 22eabe5..4c9722b 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -8,7 +8,7 @@ # __all__ = [ - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'ShrdExclLock' ] import threading @@ -18,7 +18,10 @@ import _multiprocessing from multiprocessing.process import current_process from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from multiprocessing.sharedctypes import RawValue from time import time as _time +from functools import partial as _partial +from _synchronize import make_shrd_excl_lock as _make_shrd_excl_lock # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. @@ -373,3 +376,9 @@ class Barrier(threading.Barrier): @_count.setter def _count(self, value): self._array[1] = value + +# +# ShrdExclLock +# + +ShrdExclLock = _make_shrd_excl_lock(Condition, _partial(RawValue, 'I')) diff --git a/Lib/threading.py b/Lib/threading.py index 6c34d49..e35b4d3 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -1,6 +1,7 @@ """Thread module emulating a subset of Java's threading model.""" import sys as _sys +import ctypes as _ctypes import _thread from time import sleep as _sleep @@ -10,6 +11,7 @@ except ImportError: from time import time as _time from traceback import format_exc as _format_exc from _weakrefset import WeakSet +from _synchronize import make_shrd_excl_lock as _make_shrd_excl_lock # Note regarding PEP 8 compliant names # This threading model was originally inspired by Java, and inherited @@ -23,7 +25,8 @@ from _weakrefset import WeakSet __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', - 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'] + 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', + 'stack_size', 'ShrdExclLock'] # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread @@ -502,6 +505,9 @@ class Barrier: class BrokenBarrierError(RuntimeError): pass +ShrdExclLock = _make_shrd_excl_lock(Condition, _ctypes.c_uint) + + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"): -- 1.7.10.4