+++ /dev/null
-# Run the _testcapi module tests (tests for the Python/C API): by defn,\r
-# these are all functions _testcapi exports whose name begins with 'test_'.\r
-\r
-from __future__ import with_statement\r
-import sys\r
-import time\r
-import random\r
-import unittest\r
-from test import test_support\r
-try:\r
- import threading\r
-except ImportError:\r
- threading = None\r
-import _testcapi\r
-\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class TestPendingCalls(unittest.TestCase):\r
-\r
- def pendingcalls_submit(self, l, n):\r
- def callback():\r
- #this function can be interrupted by thread switching so let's\r
- #use an atomic operation\r
- l.append(None)\r
-\r
- for i in range(n):\r
- time.sleep(random.random()*0.02) #0.01 secs on average\r
- #try submitting callback until successful.\r
- #rely on regular interrupt to flush queue if we are\r
- #unsuccessful.\r
- while True:\r
- if _testcapi._pending_threadfunc(callback):\r
- break;\r
-\r
- def pendingcalls_wait(self, l, n, context = None):\r
- #now, stick around until l[0] has grown to 10\r
- count = 0;\r
- while len(l) != n:\r
- #this busy loop is where we expect to be interrupted to\r
- #run our callbacks. Note that callbacks are only run on the\r
- #main thread\r
- if False and test_support.verbose:\r
- print "(%i)"%(len(l),),\r
- for i in xrange(1000):\r
- a = i*i\r
- if context and not context.event.is_set():\r
- continue\r
- count += 1\r
- self.assertTrue(count < 10000,\r
- "timeout waiting for %i callbacks, got %i"%(n, len(l)))\r
- if False and test_support.verbose:\r
- print "(%i)"%(len(l),)\r
-\r
- def test_pendingcalls_threaded(self):\r
- #do every callback on a separate thread\r
- n = 32 #total callbacks\r
- threads = []\r
- class foo(object):pass\r
- context = foo()\r
- context.l = []\r
- context.n = 2 #submits per thread\r
- context.nThreads = n // context.n\r
- context.nFinished = 0\r
- context.lock = threading.Lock()\r
- context.event = threading.Event()\r
-\r
- for i in range(context.nThreads):\r
- t = threading.Thread(target=self.pendingcalls_thread, args = (context,))\r
- t.start()\r
- threads.append(t)\r
-\r
- self.pendingcalls_wait(context.l, n, context)\r
-\r
- for t in threads:\r
- t.join()\r
-\r
- def pendingcalls_thread(self, context):\r
- try:\r
- self.pendingcalls_submit(context.l, context.n)\r
- finally:\r
- with context.lock:\r
- context.nFinished += 1\r
- nFinished = context.nFinished\r
- if False and test_support.verbose:\r
- print "finished threads: ", nFinished\r
- if nFinished == context.nThreads:\r
- context.event.set()\r
-\r
- def test_pendingcalls_non_threaded(self):\r
- #again, just using the main thread, likely they will all be dispatched at\r
- #once. It is ok to ask for too many, because we loop until we find a slot.\r
- #the loop can be interrupted to dispatch.\r
- #there are only 32 dispatch slots, so we go for twice that!\r
- l = []\r
- n = 64\r
- self.pendingcalls_submit(l, n)\r
- self.pendingcalls_wait(l, n)\r
-\r
-\r
-def test_main():\r
-\r
- for name in dir(_testcapi):\r
- if name.startswith('test_'):\r
- test = getattr(_testcapi, name)\r
- if test_support.verbose:\r
- print "internal", name\r
- try:\r
- test()\r
- except _testcapi.error:\r
- raise test_support.TestFailed, sys.exc_info()[1]\r
-\r
- # some extra thread-state tests driven via _testcapi\r
- def TestThreadState():\r
- if test_support.verbose:\r
- print "auto-thread-state"\r
-\r
- idents = []\r
-\r
- def callback():\r
- idents.append(thread.get_ident())\r
-\r
- _testcapi._test_thread_state(callback)\r
- a = b = callback\r
- time.sleep(1)\r
- # Check our main thread is in the list exactly 3 times.\r
- if idents.count(thread.get_ident()) != 3:\r
- raise test_support.TestFailed, \\r
- "Couldn't find main thread correctly in the list"\r
-\r
- if threading:\r
- import thread\r
- import time\r
- TestThreadState()\r
- t=threading.Thread(target=TestThreadState)\r
- t.start()\r
- t.join()\r
-\r
- test_support.run_unittest(TestPendingCalls)\r
-\r
-if __name__ == "__main__":\r
- test_main()\r