]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_threaded_import.py
AppPkg/Applications/Python: Add Python 2.7.2 sources since the release of Python...
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_threaded_import.py
1 # This is a variant of the very old (early 90's) file
2 # Demo/threads/bug.py. It simply provokes a number of threads into
3 # trying to import the same module "at the same time".
4 # There are no pleasant failure modes -- most likely is that Python
5 # complains several times about module random having no attribute
6 # randrange, and then Python hangs.
7
8 import unittest
9 from test.test_support import verbose, TestFailed, import_module
10 thread = import_module('thread')
11
12 critical_section = thread.allocate_lock()
13 done = thread.allocate_lock()
14
15 def task():
16 global N, critical_section, done
17 import random
18 x = random.randrange(1, 3)
19 critical_section.acquire()
20 N -= 1
21 # Must release critical_section before releasing done, else the main
22 # thread can exit and set critical_section to None as part of global
23 # teardown; then critical_section.release() raises AttributeError.
24 finished = N == 0
25 critical_section.release()
26 if finished:
27 done.release()
28
29 def test_import_hangers():
30 import sys
31 if verbose:
32 print "testing import hangers ...",
33
34 import test.threaded_import_hangers
35 try:
36 if test.threaded_import_hangers.errors:
37 raise TestFailed(test.threaded_import_hangers.errors)
38 elif verbose:
39 print "OK."
40 finally:
41 # In case this test is run again, make sure the helper module
42 # gets loaded from scratch again.
43 del sys.modules['test.threaded_import_hangers']
44
45 # Tricky: When regrtest imports this module, the thread running regrtest
46 # grabs the import lock and won't let go of it until this module returns.
47 # All other threads attempting an import hang for the duration. Since
48 # this test spawns threads that do little *but* import, we can't do that
49 # successfully until after this module finishes importing and regrtest
50 # regains control. To make this work, a special case was added to
51 # regrtest to invoke a module's "test_main" function (if any) after
52 # importing it.
53
54 def test_main(): # magic name! see above
55 global N, done
56
57 import imp
58 if imp.lock_held():
59 # This triggers on, e.g., from test import autotest.
60 raise unittest.SkipTest("can't run when import lock is held")
61
62 done.acquire()
63 for N in (20, 50) * 3:
64 if verbose:
65 print "Trying", N, "threads ...",
66 for i in range(N):
67 thread.start_new_thread(task, ())
68 done.acquire()
69 if verbose:
70 print "OK."
71 done.release()
72
73 test_import_hangers()
74
75 if __name__ == "__main__":
76 test_main()