diff -r 3ebe2d261920 Include/pythread.h --- a/Include/pythread.h Thu May 12 16:18:29 2011 +0100 +++ b/Include/pythread.h Thu May 12 22:28:38 2011 +0200 @@ -84,6 +84,7 @@ PyAPI_FUNC(void) PyThread_delete_key_value(int key); /* Cleanup after a fork */ +PyAPI_FUNC(void) PyThread_ReInitLocks(void); PyAPI_FUNC(void) PyThread_ReInitTLS(void); #ifdef __cplusplus diff -r 3ebe2d261920 Lib/test/lock_tests.py --- a/Lib/test/lock_tests.py Thu May 12 16:18:29 2011 +0100 +++ b/Lib/test/lock_tests.py Thu May 12 22:28:38 2011 +0200 @@ -3,6 +3,7 @@ """ import sys +import os import time from _thread import start_new_thread, get_ident, TIMEOUT_MAX import threading @@ -286,6 +287,66 @@ self.assertFalse(lock._is_owned()) +class LockAfterForkTests(BaseTestCase): + """ + Tests that standard and internal locks (e.g. I/O) are correctly + re-initialized after fork. + """ + + def test_lock_after_fork(self): + lock = self.locktype() + lock.acquire() + pid = os.fork() + if pid == 0: + # the lock should be re-initialized + for i in range(5): + lock.acquire() + lock.release() + os._exit(0) + else: + lock.release() + os.waitpid(pid, 0) + + def test_lock_after_fork_from_thread(self): + # same thing, but with a lock acquired from another thread + lock = self.locktype() + def f(): + lock.acquire() + b = Bunch(f, 1) + b.wait_for_finished() + pid = os.fork() + if pid == 0: + # the lock should be re-initialized + lock.acquire() + lock.release() + os._exit(0) + else: + lock.release() + os.waitpid(pid, 0) + + def test_iolock_after_fork(self): + r, w = os.pipe() + rf = os.fdopen(r) + def f(): + # this will block with the file's lock held + rf.read(1) + b = Bunch(f, 1) + time.sleep(0.1) + # fork while the file's lock is held, and close the write-end so that + # the thread and child process receive EOF + pid = os.fork() + os.close(w) + if pid == 0: + # the file's lock should be reinitialized, should get EOF + rf.read(1) + rf.close() + os._exit(0) + else: + b.wait_for_finished() + rf.close() + os.waitpid(pid, 0) + + class EventTests(BaseTestCase): """ Tests for Event objects. diff -r 3ebe2d261920 Lib/test/test_threading.py --- a/Lib/test/test_threading.py Thu May 12 16:18:29 2011 +0100 +++ b/Lib/test/test_threading.py Thu May 12 22:28:38 2011 +0200 @@ -713,6 +713,10 @@ class CRLockTests(lock_tests.RLockTests): locktype = staticmethod(threading._CRLock) +@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") +class LockAfterForkTests(lock_tests.LockAfterForkTests): + locktype = staticmethod(threading.Lock) + class EventTests(lock_tests.EventTests): eventtype = staticmethod(threading.Event) @@ -734,7 +738,8 @@ def test_main(): - test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests, + test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, + LockAfterForkTests, EventTests, ConditionAsRLockTests, ConditionTests, SemaphoreTests, BoundedSemaphoreTests, ThreadTests, diff -r 3ebe2d261920 Modules/_threadmodule.c --- a/Modules/_threadmodule.c Thu May 12 16:18:29 2011 +0100 +++ b/Modules/_threadmodule.c Thu May 12 22:28:38 2011 +0200 @@ -322,7 +322,10 @@ r = acquire_timed(self->rlock_lock, microseconds); } if (r == PY_LOCK_ACQUIRED) { - assert(self->rlock_count == 0); + if (self->rlock_count != 0) { + PyErr_SetString(ThreadError, "couldn't acquire lock"); + return NULL; + } self->rlock_owner = tid; self->rlock_count = 1; } @@ -393,11 +396,10 @@ r = PyThread_acquire_lock(self->rlock_lock, 1); Py_END_ALLOW_THREADS } - if (!r) { + if (!r || self->rlock_count != 0) { PyErr_SetString(ThreadError, "couldn't acquire lock"); return NULL; } - assert(self->rlock_count == 0); self->rlock_owner = owner; self->rlock_count = count; Py_RETURN_NONE; diff -r 3ebe2d261920 Modules/signalmodule.c --- a/Modules/signalmodule.c Thu May 12 16:18:29 2011 +0100 +++ b/Modules/signalmodule.c Thu May 12 22:28:38 2011 +0200 @@ -1234,6 +1234,7 @@ PyOS_AfterFork(void) { #ifdef WITH_THREAD + PyThread_ReinitLocks(); _PyGILState_Reinit(); PyEval_ReInitThreads(); main_thread = PyThread_get_thread_ident(); diff -r 3ebe2d261920 Python/thread_pthread.h --- a/Python/thread_pthread.h Thu May 12 16:18:29 2011 +0100 +++ b/Python/thread_pthread.h Thu May 12 22:28:38 2011 +0200 @@ -104,6 +104,16 @@ } while(0) +/* pthread lock definition */ +#ifdef USE_SEMAPHORES + +typedef struct _pthread_lock { + sem_t sem; + struct _pthread_lock *next; +} pthread_lock; + +#else + /* A pthread mutex isn't sufficient to model the Python lock type * because, according to Draft 5 of the docs (P1003.4a/D5), both of the * following are undefined: @@ -119,15 +129,47 @@ * bit is cleared. 9 May 1994 tim@ksr.com */ -typedef struct { +typedef struct _pthread_lock { char locked; /* 0=unlocked, 1=locked */ /* a pair to handle an acquire of a locked lock */ pthread_cond_t lock_released; pthread_mutex_t mut; + struct _pthread_lock *next; } pthread_lock; +#endif /* USE_SEMAPHORES */ + #define CHECK_STATUS(name) if (status != 0) { perror(name); error = 1; } + +/* head of the linked list of locks */ +static pthread_lock *locks_head = NULL; + + +/* linked list handling routines */ +Py_LOCAL_INLINE(void) +list_add(pthread_lock **head, pthread_lock *e) +{ + e->next = *head; + *head = e; +} + +Py_LOCAL_INLINE(void) +list_remove(pthread_lock **head, pthread_lock *e) +{ + if (*head == e) { + *head = e->next; + } else { + pthread_lock *cur; + for (cur = *head; cur != NULL; cur = cur->next) { + if (cur->next == e) { + cur->next = e->next; + break; + } + } + } +} + /* * Initialization. */ @@ -258,22 +300,24 @@ PyThread_type_lock PyThread_allocate_lock(void) { - sem_t *lock; + pthread_lock *lock; int status, error = 0; dprintf(("PyThread_allocate_lock called\n")); if (!initialized) PyThread_init_thread(); - lock = (sem_t *)malloc(sizeof(sem_t)); + lock = malloc(sizeof(pthread_lock)); if (lock) { - status = sem_init(lock,0,1); + status = sem_init(&lock->sem,0,1); CHECK_STATUS("sem_init"); if (error) { free((void *)lock); lock = NULL; + } else { + list_add(&locks_head, lock); } } @@ -284,7 +328,7 @@ void PyThread_free_lock(PyThread_type_lock lock) { - sem_t *thelock = (sem_t *)lock; + pthread_lock *thelock = (pthread_lock *)lock; int status, error = 0; dprintf(("PyThread_free_lock(%p) called\n", lock)); @@ -292,12 +336,27 @@ if (!thelock) return; - status = sem_destroy(thelock); + list_remove(&locks_head, thelock); + + status = sem_destroy(&thelock->sem); CHECK_STATUS("sem_destroy"); free((void *)thelock); } +void +PyThread_reinit_lock(PyThread_type_lock lock) +{ + pthread_lock *thelock = (pthread_lock *)lock; + + dprintf(("PyThread_reinit_lock(%p) called\n", lock)); + + (void)sem_destroy(&thelock->sem); + + (void)sem_init(&thelock->sem, 0, 1); +} + + /* * As of February 2002, Cygwin thread implementations mistakenly report error * codes in the return value of the sem_ calls (like the pthread_ functions). @@ -315,7 +374,7 @@ int intr_flag) { PyLockStatus success; - sem_t *thelock = (sem_t *)lock; + pthread_lock *thelock = (pthread_lock *)lock; int status, error = 0; struct timespec ts; @@ -326,11 +385,11 @@ MICROSECONDS_TO_TIMESPEC(microseconds, ts); do { if (microseconds > 0) - status = fix_status(sem_timedwait(thelock, &ts)); + status = fix_status(sem_timedwait(&thelock->sem, &ts)); else if (microseconds == 0) - status = fix_status(sem_trywait(thelock)); + status = fix_status(sem_trywait(&thelock->sem)); else - status = fix_status(sem_wait(thelock)); + status = fix_status(sem_wait(&thelock->sem)); /* Retry if interrupted by a signal, unless the caller wants to be notified. */ } while (!intr_flag && status == EINTR); @@ -366,12 +425,12 @@ void PyThread_release_lock(PyThread_type_lock lock) { - sem_t *thelock = (sem_t *)lock; + pthread_lock *thelock = (pthread_lock *)lock; int status, error = 0; dprintf(("PyThread_release_lock(%p) called\n", lock)); - status = sem_post(thelock); + status = sem_post(&thelock->sem); CHECK_STATUS("sem_post"); } @@ -412,6 +471,8 @@ if (error) { free((void *)lock); lock = 0; + } else { + list_add(&locks_head, lock); } } @@ -427,6 +488,8 @@ dprintf(("PyThread_free_lock(%p) called\n", lock)); + list_remove(&locks_head, thelock); + status = pthread_mutex_destroy( &thelock->mut ); CHECK_STATUS("pthread_mutex_destroy"); @@ -436,6 +499,22 @@ free((void *)thelock); } +void +PyThread_reinit_lock(PyThread_type_lock lock) +{ + pthread_lock *thelock = (pthread_lock *)lock; + + dprintf(("PyThread_reinit_lock(%p) called\n", lock)); + + (void)pthread_mutex_destroy(&thelock->mut); + (void)pthread_cond_destroy(&thelock->lock_released); + + thelock->locked = 0; + (void)pthread_mutex_init(&thelock->mut, pthread_mutexattr_default); + _Py_ANNOTATE_PURE_HAPPENS_BEFORE_MUTEX(&thelock->mut); + (void)pthread_cond_init(&thelock->lock_released, pthread_condattr_default); +} + PyLockStatus PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds, int intr_flag) @@ -525,6 +604,37 @@ #endif /* USE_SEMAPHORES */ +/* + * After fork, POSIX explicitly states that locks and other synchronization + * primitives are unusable in the child process. This can trigger random + * deadlocks and even segfaults in rare cases. + * To mitigate this issue, PyThread_ReinitLocks is called by PyOS_AfterFork and + * re-initializes locks so that they are usable in the child process. + * Notes: + * - This only applies to raw locks, i.e. locks allocated directly through + * PyThread_allocate_lock, such as threading.Lock and locks used in C code + * (e.g. I/O code). This doesn't solve the problem for RLock and + * higher-level synchronization primitives such as Condition and Event + * (those could probably be handled by a pthread_atfork-like mechanism + * invoked by PyOS_AfterFork). + * - Calling pthread_mutex_destroy/sem_destroy from the child process should + * be safe and avoid leaks. If it turns out to cause problems with some + * pthread implementations, those could probably be removed, calling just + * pthread_mutex_init/sem_init. + * - The linked list of locks (locks_head) doesn't need to be protected from + * concurrent access because locks are created/destroyed with the GIL held, + * and PyThread_ReinitLocks is called after fork, so there's only one + * running thread at that point. +*/ +void +PyThread_ReinitLocks(void) +{ + pthread_lock *l; + + for (l = locks_head; l != NULL; l = l->next) + PyThread_reinit_lock(l); +} + int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) {