]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_capi.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_capi.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_capi.py b/AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_capi.py
deleted file mode 100644 (file)
index 5013d34..0000000
+++ /dev/null
@@ -1,140 +0,0 @@
-# Run the _testcapi module tests (tests for the Python/C API):  by defn,\r
-# these are all functions _testcapi exports whose name begins with 'test_'.\r
-\r
-from __future__ import with_statement\r
-import sys\r
-import time\r
-import random\r
-import unittest\r
-from test import test_support\r
-try:\r
-    import threading\r
-except ImportError:\r
-    threading = None\r
-import _testcapi\r
-\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class TestPendingCalls(unittest.TestCase):\r
-\r
-    def pendingcalls_submit(self, l, n):\r
-        def callback():\r
-            #this function can be interrupted by thread switching so let's\r
-            #use an atomic operation\r
-            l.append(None)\r
-\r
-        for i in range(n):\r
-            time.sleep(random.random()*0.02) #0.01 secs on average\r
-            #try submitting callback until successful.\r
-            #rely on regular interrupt to flush queue if we are\r
-            #unsuccessful.\r
-            while True:\r
-                if _testcapi._pending_threadfunc(callback):\r
-                    break;\r
-\r
-    def pendingcalls_wait(self, l, n, context = None):\r
-        #now, stick around until l[0] has grown to 10\r
-        count = 0;\r
-        while len(l) != n:\r
-            #this busy loop is where we expect to be interrupted to\r
-            #run our callbacks.  Note that callbacks are only run on the\r
-            #main thread\r
-            if False and test_support.verbose:\r
-                print "(%i)"%(len(l),),\r
-            for i in xrange(1000):\r
-                a = i*i\r
-            if context and not context.event.is_set():\r
-                continue\r
-            count += 1\r
-            self.assertTrue(count < 10000,\r
-                "timeout waiting for %i callbacks, got %i"%(n, len(l)))\r
-        if False and test_support.verbose:\r
-            print "(%i)"%(len(l),)\r
-\r
-    def test_pendingcalls_threaded(self):\r
-        #do every callback on a separate thread\r
-        n = 32 #total callbacks\r
-        threads = []\r
-        class foo(object):pass\r
-        context = foo()\r
-        context.l = []\r
-        context.n = 2 #submits per thread\r
-        context.nThreads = n // context.n\r
-        context.nFinished = 0\r
-        context.lock = threading.Lock()\r
-        context.event = threading.Event()\r
-\r
-        for i in range(context.nThreads):\r
-            t = threading.Thread(target=self.pendingcalls_thread, args = (context,))\r
-            t.start()\r
-            threads.append(t)\r
-\r
-        self.pendingcalls_wait(context.l, n, context)\r
-\r
-        for t in threads:\r
-            t.join()\r
-\r
-    def pendingcalls_thread(self, context):\r
-        try:\r
-            self.pendingcalls_submit(context.l, context.n)\r
-        finally:\r
-            with context.lock:\r
-                context.nFinished += 1\r
-                nFinished = context.nFinished\r
-                if False and test_support.verbose:\r
-                    print "finished threads: ", nFinished\r
-            if nFinished == context.nThreads:\r
-                context.event.set()\r
-\r
-    def test_pendingcalls_non_threaded(self):\r
-        #again, just using the main thread, likely they will all be dispatched at\r
-        #once.  It is ok to ask for too many, because we loop until we find a slot.\r
-        #the loop can be interrupted to dispatch.\r
-        #there are only 32 dispatch slots, so we go for twice that!\r
-        l = []\r
-        n = 64\r
-        self.pendingcalls_submit(l, n)\r
-        self.pendingcalls_wait(l, n)\r
-\r
-\r
-def test_main():\r
-\r
-    for name in dir(_testcapi):\r
-        if name.startswith('test_'):\r
-            test = getattr(_testcapi, name)\r
-            if test_support.verbose:\r
-                print "internal", name\r
-            try:\r
-                test()\r
-            except _testcapi.error:\r
-                raise test_support.TestFailed, sys.exc_info()[1]\r
-\r
-    # some extra thread-state tests driven via _testcapi\r
-    def TestThreadState():\r
-        if test_support.verbose:\r
-            print "auto-thread-state"\r
-\r
-        idents = []\r
-\r
-        def callback():\r
-            idents.append(thread.get_ident())\r
-\r
-        _testcapi._test_thread_state(callback)\r
-        a = b = callback\r
-        time.sleep(1)\r
-        # Check our main thread is in the list exactly 3 times.\r
-        if idents.count(thread.get_ident()) != 3:\r
-            raise test_support.TestFailed, \\r
-                  "Couldn't find main thread correctly in the list"\r
-\r
-    if threading:\r
-        import thread\r
-        import time\r
-        TestThreadState()\r
-        t=threading.Thread(target=TestThreadState)\r
-        t.start()\r
-        t.join()\r
-\r
-    test_support.run_unittest(TestPendingCalls)\r
-\r
-if __name__ == "__main__":\r
-    test_main()\r