From ed513752b52821cfd6ab62d26ab515e8491bbe35 Mon Sep 17 00:00:00 2001 From: Sebastian Noack Date: Sun, 30 Sep 2012 02:56:11 +0200 Subject: [PATCH] Added ShrdExclLock to threading and multiprocessing module. --- Lib/multiprocessing/__init__.py | 11 ++++- Lib/multiprocessing/synchronize.py | 20 +++++++- Lib/threading.py | 88 +++++++++++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py index 1f3e67c..0dff5bd 100644 --- a/Lib/multiprocessing/__init__.py +++ b/Lib/multiprocessing/__init__.py @@ -23,8 +23,8 @@ __all__ = [ 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', - 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', - 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', + 'Event', 'Barrier', 'ShrdExclLock', 'Queue', 'SimpleQueue', 'JoinableQueue', + 'Pool', 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING', ] __author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)' @@ -193,6 +193,13 @@ def Barrier(parties, action=None, timeout=None): from multiprocessing.synchronize import Barrier return Barrier(parties, action, timeout) +def ShrdExclLock(): + ''' + Returns a shared and exclusive lock object + ''' + from multiprocessing.synchronize import ShrdExclLock + return ShrdExclLock() + def Queue(maxsize=0): ''' Returns a queue object diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index 22eabe5..c0da092 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -8,7 +8,7 @@ # __all__ = [ - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'ShrdExclLock' ] import threading @@ -373,3 +373,21 @@ class Barrier(threading.Barrier): @_count.setter def _count(self, value): self._array[1] = value + +# +# ShrdExclLock +# + +def ShrdExclLock(): + from multiprocessing.sharedctypes import RawValue + + cond = Condition(Lock()) + + shrd_count = RawValue('I', 0) + excl_count = RawValue('I', 0) + + waiting_count = RawValue('I', 0) + granted_count = RawValue('I', 0) + + return (threading._ShrdLock(cond, shrd_count, excl_count, waiting_count, granted_count), + threading._ExclLock(cond, shrd_count, excl_count, waiting_count, granted_count)) diff --git a/Lib/threading.py b/Lib/threading.py index 6c34d49..0483649 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -23,7 +23,8 @@ from _weakrefset import WeakSet __all__ = ['active_count', 'Condition', 'current_thread', 'enumerate', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', - 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'] + 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', + 'stack_size', 'ShrdExclLock'] # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread @@ -502,6 +503,91 @@ class Barrier: class BrokenBarrierError(RuntimeError): pass +class _ShrdExclLock: + def __init__(self, cond, shrd_count, excl_count, waiting_count, granted_count): + self._cond = cond + self._shrd_count = shrd_count + self._excl_count = excl_count + self._waiting_count = waiting_count + self._granted_count = granted_count + self._count = self._get_count() + + def acquire(self, blocking=True): + with self._cond: + waitno = self._waiting_count.value + + if waitno == self._granted_count.value and self._can_acquire(): + self._count.value += 1 + return True + + if not blocking: + return False + + self._waiting_count.value = waitno + 1 + + while True: + self._cond.wait() + + if waitno == self._granted_count.value and self._can_acquire(): + self._count.value += 1 + + self._granted_count.value += 1 + self._cond.notify_all() + + return True + + def release(self): + with self._cond: + if self._count.value == 0: + raise ValueError('lock released to many times') + self._count.value -= 1 + self._cond.notify_all() + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_value, tb): + self.release() + + def __getstate__(self): + return (self._cond, self._shrd_count, self._excl_count, + self._waiting_count, self._granted_count) + + def __setstate__(self, state): + (self._cond, self._shrd_count, self._excl_count, + self._waiting_count, self._granted_count) = state + self._count = self._get_count() + +class _ShrdLock(_ShrdExclLock): + def _get_count(self): + return self._shrd_count + + def _can_acquire(self): + return 0 == self._excl_count.value + +class _ExclLock(_ShrdExclLock): + def _get_count(self): + return self._excl_count + + def _can_acquire(self): + return 0 == self._excl_count.value == self._shrd_count.value + +def ShrdExclLock(): + from ctypes import c_uint + + cond = Condition(Lock()) + + shrd_count = c_uint(0) + excl_count = c_uint(0) + + waiting_count = c_uint(0) + granted_count = c_uint(0) + + return (_ShrdLock(cond, shrd_count, excl_count, waiting_count, granted_count), + _ExclLock(cond, shrd_count, excl_count, waiting_count, granted_count)) + + # Helper to generate new thread names _counter = 0 def _newname(template="Thread-%d"): -- 1.7.10.4