Index: Doc/c-api/init.rst =================================================================== --- Doc/c-api/init.rst (revision 68340) +++ Doc/c-api/init.rst (working copy) @@ -780,6 +780,50 @@ .. versionadded:: 2.3 + +Asynchronous Notifications +========================== + +A mechanism is provided to make asynchronous notifications to the the main +interpreter thread. These notifications take the form of a function +pointer and a void argument. + +.. index:: single: setcheckinterval() (in module sys) + +Every check interval, when the interpreter lock is released and reacquired, +python will also call any such provided functions. This can be used for +example by asynchronous IO handlers. The notification can be scheduled +from a worker thread and the actual call than made at the earliest +convenience by the main thread where it has possession of the global +interpreter lock and can perform any Python API calls. + +.. cfunction:: void Py_AddPendingCall( int (*func)(void *), void *arg) ) + + .. index:: single: Py_AddPendingCall() + + Post a notification to the Python main thread. If successful, + \*:attr`func` will be called with the argument :attr:`arg` at the earliest + convenience. \*:attr:`func` will be called having the global interpreter + lock held and can thus use the full Python API and can take any + action such as setting object attributes to signal IO completion. + It must return 0 on success, or -1 signalling an exception. + The notification function won't be interrupted to perform another + asynchronous notification recursively, + but it can still be interrupted to switch threads if the interpreter + lock is released, for example, if it calls back into python code. + + This function returns 0 on success in which case the notification has been + scheduled. Otherwise, for example if the notification buffer is full, + it returns -1 without setting any exception. + + This function can be called on any thread, be it a Python thread or + some other system thread. If it is a Python thread, it doesen't matter if + it holds the global interpreter lock or not. + + .. versionadded:: 2.7 + + + .. _profiling: Profiling and Tracing Index: Doc/whatsnew/2.7.rst =================================================================== --- Doc/whatsnew/2.7.rst (revision 68340) +++ Doc/whatsnew/2.7.rst (working copy) @@ -60,7 +60,12 @@ .. ======================================================================== +Kristján Valur Jónsson, issue 4293 +Py_AddPendingCall is now thread safe. This allows any worker thread +to submit notifications to the python main thread. This is particularly +useful for asynchronous IO operations. + Other Language Changes ====================== Index: Lib/test/test_capi.py =================================================================== --- Lib/test/test_capi.py (revision 68340) +++ Lib/test/test_capi.py (working copy) @@ -2,9 +2,74 @@ # these are all functions _testcapi exports whose name begins with 'test_'. import sys +import time +import random +import unittest +import threading from test import test_support import _testcapi +class TestCAPI(unittest.TestCase): + + def pendingcalls_submit(self, l, n): + def callback(): + #this function can be interrupted by thread switching so let's + #use an atomic operation + l.append(None) + + for i in range(n): + time.sleep(random.random()*0.02) #0.01 secs on average + #try submitting callback until successful. + #rely on regular interrupt to flush queue if we are + #unsuccessful. + while True: + if _testcapi._pending_threadfunc(callback): + break; + + def pendingcalls_wait(self, l, n): + #now, stick around until l[0] has grown to 10 + count = 0; + while len(l) != n: + #this busy loop is where we expect to be interrupted to + #run our callbacks. Note that callbacks are only run on the + #main thread + if False and test_support.verbose: + print "(%i)"%(len(l),), + for i in xrange(1000): + a = i*i + count += 1 + self.failUnless(count < 10000, + "timeout waiting for %i callbacks, got %i"%(n, len(l))) + if False and test_support.verbose: + print "(%i)"%(len(l),) + + def test_pendingcalls_threaded(self): + l = [] + + #do every callback on a separate thread + n = 32 + threads = [] + for i in range(n): + t = threading.Thread(target=self.pendingcalls_submit, args = (l, 1)) + t.start() + threads.append(t) + + self.pendingcalls_wait(l, n) + + for t in threads: + t.join() + + def test_pendingcalls_non_threaded(self): + #again, just using the main thread, likely they will all be dispathced at + #once. It is ok to ask for too many, because we loop until we find a slot. + #the loop can be interrupted to dispatch. + #there are only 32 dispatch slots, so we go for twice that! + l = [] + n = 64 + self.pendingcalls_submit(l, n) + self.pendingcalls_wait(l, n) + + def test_main(): for name in dir(_testcapi): @@ -49,6 +114,7 @@ t=threading.Thread(target=TestThreadState) t.start() t.join() + unittest.main() if __name__ == "__main__": test_main() Index: Modules/_testcapimodule.c =================================================================== --- Modules/_testcapimodule.c (revision 68340) +++ Modules/_testcapimodule.c (working copy) @@ -837,6 +837,43 @@ return NULL; Py_RETURN_NONE; } + +/* test Py_AddPendingCalls using threads */ +static int _pending_callback(void *arg) +{ + /* we assume the argument is callable object to which we own a reference */ + PyObject *callable = (PyObject *)arg; + PyObject *r = PyObject_CallObject(callable, NULL); + Py_DECREF(callable); + Py_XDECREF(r); + return r != NULL ? 0 : -1; +} + +/* The following requests n callbacks to _pending_callback. It can be + * run from any python thread. + */ +PyObject *pending_threadfunc(PyObject *self, PyObject *arg) +{ + PyObject *callable; + int r; + if (PyArg_ParseTuple(arg, "O", &callable) == 0) + return NULL; + + /* create the reference for the callbackwhile we hold the lock */ + Py_INCREF(callable); + + Py_BEGIN_ALLOW_THREADS + r = Py_AddPendingCall(&_pending_callback, callable); + Py_END_ALLOW_THREADS + + if (r<0) { + Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */ + Py_INCREF(Py_False); + return Py_False; + } + Py_INCREF(Py_True); + return Py_True; +} #endif /* Some tests of PyString_FromFormat(). This needs more tests. */ @@ -941,6 +978,7 @@ #endif #ifdef WITH_THREAD {"_test_thread_state", test_thread_state, METH_VARARGS}, + {"_pending_threadfunc", pending_threadfunc, METH_VARARGS}, #endif {"traceback_print", traceback_print, METH_VARARGS}, {NULL, NULL} /* sentinel */ Index: Python/ceval.c =================================================================== --- Python/ceval.c (revision 68340) +++ Python/ceval.c (working copy) @@ -213,6 +213,7 @@ #include "pythread.h" static PyThread_type_lock interpreter_lock = 0; /* This is the GIL */ +static PyThread_type_lock pending_lock = 0; /* for pending calls */ static long main_thread = 0; int @@ -284,6 +285,7 @@ adding a new function to each thread_*.h. Instead, just create a new lock and waste a little bit of memory */ interpreter_lock = PyThread_allocate_lock(); + pending_lock = PyThread_allocate_lock(); PyThread_acquire_lock(interpreter_lock, 1); main_thread = PyThread_get_thread_ident(); @@ -356,19 +358,145 @@ #ifdef WITH_THREAD Any thread can schedule pending calls, but only the main thread will execute them. + There is no facility to schedule calls to a particular thread, but + that should be easy to change, should that ever be required. In + that case, the static variables here should go into the python + threadstate. #endif +*/ - XXX WARNING! ASYNCHRONOUSLY EXECUTING CODE! +#ifdef WITH_THREAD + +/* The WITH_THREAD implementation is thread-safe. It allows + scheduling to be made from any thread, and even from an executing + callback. + */ + +#define NPENDINGCALLS 32 +static struct { + int (*func)(void *); + void *arg; +} pendingcalls[NPENDINGCALLS]; +static int pendingfirst = 0; +static int pendinglast = 0; +static volatile int pendingcalls_to_do = 1; /* trigger initialization of lock */ +static char pendingbusy = 0; + +int +Py_AddPendingCall(int (*func)(void *), void *arg) +{ + int i, j, result=0; + PyThread_type_lock lock = pending_lock; + + /* try a few times for the lock. Since this mechanism is used + * for signal handling (on the main thread), there is a (slim) + * chance that a signal is delivered on the same thread while we + * hold the lock during the Py_MakePendingCalls() function. + * This avoids a deadlock in that case. + * Note that signals can be delivered on any thread. In particular, + * on Windows, a SIGINT is delivered on a system-created worker + * thread. + * We also check for lock being NULL, in the unlikely case that + * this function is called before any bytecode evaluation takes place. + */ + if (lock != NULL) { + for (i = 0; i<100; i++) { + if (PyThread_acquire_lock(lock, NOWAIT_LOCK)) + break; + } + if (i == 100) + return -1; + } + + i = pendinglast; + j = (i + 1) % NPENDINGCALLS; + if (j == pendingfirst) { + result = -1; /* Queue full */ + } else { + pendingcalls[i].func = func; + pendingcalls[i].arg = arg; + pendinglast = j; + } + /* signal main loop */ + _Py_Ticker = 0; + pendingcalls_to_do = 1; + if (lock != NULL) + PyThread_release_lock(lock); + return result; +} + +int +Py_MakePendingCalls(void) +{ + int i; + int r = 0; + + if (!pending_lock) { + /* initial allocation of the lock */ + pending_lock = PyThread_allocate_lock(); + if (pending_lock == NULL) + return -1; + } + + /* only service pending calls on main thread */ + if (main_thread && PyThread_get_thread_ident() != main_thread) + return 0; + /* don't perform recursive pending calls */ + if (pendingbusy) + return 0; + pendingbusy = 1; + /* perform a bounded number of calls, in case of recursion */ + for (i=0; i