Index: Lib/test/test_capi.py =================================================================== --- Lib/test/test_capi.py (revision 68340) +++ Lib/test/test_capi.py (working copy) @@ -2,9 +2,72 @@ # these are all functions _testcapi exports whose name begins with 'test_'. import sys +import time +import random +import unittest +import threading from test import test_support import _testcapi +class TestCAPI(unittest.TestCase): + + def pendingcalls_submit(self, l, n): + def callback(): + l[0] += 1 + + for i in range(n): + time.sleep(random.random()*0.02) #0.01 secs on average + #try submitting callback until successful. + #rely on regular interrupt to flush queue if we are + #unsuccessful. + while True: + if _testcapi._pending_threadfunc(callback): + break; + + def pendingcalls_wait(self, l, n): + #now, stick around until l[0] has grown to 10 + count = 0; + while l[0] != n: + #this busy loop is where we expect to be interrupted to + #run our callbacks. Note that callbacks are only run on the + #main thread + if False and test_support.verbose: + print "(%i)"%(l[0],), + for i in xrange(1000): + a = i*i + count += 1 + self.failUnless(count < 10000, + "timeout waiting for %i callbacks, got %i"%(n, count)) + if False and test_support.verbose: + print "(%i)"%(l[0],) + + def test_pendingcalls_threaded(self): + l = [0] + + #do every callback on a separate thread + n = 32 + threads = [] + for i in range(n): + t = threading.Thread(target=self.pendingcalls_submit, args = (l, 1)) + t.start() + threads.append(t) + + self.pendingcalls_wait(l, n) + + for t in threads: + t.join() + + def test_pendingcalls_non_threaded(self): + #again, just using the main thread, likely they will all be dispathced at + #once. It is ok to ask for too many, because we loop until we find a slot. + #the loop can be interrupted to dispatch. + #there are only 32 dispatch slots, so we go for twice that! + l = [0] + n = 64 + self.pendingcalls_submit(l, n) + self.pendingcalls_wait(l, n) + + def test_main(): for name in dir(_testcapi): @@ -49,6 +112,7 @@ t=threading.Thread(target=TestThreadState) t.start() t.join() + unittest.main() if __name__ == "__main__": test_main() Index: Modules/_testcapimodule.c =================================================================== --- Modules/_testcapimodule.c (revision 68340) +++ Modules/_testcapimodule.c (working copy) @@ -837,6 +837,43 @@ return NULL; Py_RETURN_NONE; } + +/* test Py_AddPendingCalls using threads */ +static int _pending_callback(void *arg) +{ + /* we assume the argument is callable object to which we own a reference */ + PyObject *callable = (PyObject *)arg; + PyObject *r = PyObject_CallObject(callable, NULL); + Py_DECREF(callable); + Py_XDECREF(r); + return r != NULL ? 0 : -1; +} + +/* The following requests n callbacks to _pending_callback. It can be + * run from any python thread. + */ +PyObject *pending_threadfunc(PyObject *self, PyObject *arg) +{ + PyObject *callable; + int r; + if (PyArg_ParseTuple(arg, "O", &callable) == 0) + return NULL; + + /* create the reference for the callbackwhile we hold the lock */ + Py_INCREF(callable); + + Py_BEGIN_ALLOW_THREADS + r = Py_AddPendingCall(&_pending_callback, callable); + Py_END_ALLOW_THREADS + + if (r<0) { + Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */ + Py_INCREF(Py_False); + return Py_False; + } + Py_INCREF(Py_True); + return Py_True; +} #endif /* Some tests of PyString_FromFormat(). This needs more tests. */ @@ -941,6 +978,7 @@ #endif #ifdef WITH_THREAD {"_test_thread_state", test_thread_state, METH_VARARGS}, + {"_pending_threadfunc", pending_threadfunc, METH_VARARGS}, #endif {"traceback_print", traceback_print, METH_VARARGS}, {NULL, NULL} /* sentinel */