]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.10/Lib/dummy_thread.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.10 / Lib / dummy_thread.py
CommitLineData
3257aa99
DM
1"""Drop-in replacement for the thread module.\r
2\r
3Meant to be used as a brain-dead substitute so that threaded code does\r
4not need to be rewritten for when the thread module is not present.\r
5\r
6Suggested usage is::\r
7\r
8 try:\r
9 import thread\r
10 except ImportError:\r
11 import dummy_thread as thread\r
12\r
13"""\r
14# Exports only things specified by thread documentation;\r
15# skipping obsolete synonyms allocate(), start_new(), exit_thread().\r
16__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',\r
17 'interrupt_main', 'LockType']\r
18\r
19import traceback as _traceback\r
20\r
21class error(Exception):\r
22 """Dummy implementation of thread.error."""\r
23\r
24 def __init__(self, *args):\r
25 self.args = args\r
26\r
27def start_new_thread(function, args, kwargs={}):\r
28 """Dummy implementation of thread.start_new_thread().\r
29\r
30 Compatibility is maintained by making sure that ``args`` is a\r
31 tuple and ``kwargs`` is a dictionary. If an exception is raised\r
32 and it is SystemExit (which can be done by thread.exit()) it is\r
33 caught and nothing is done; all other exceptions are printed out\r
34 by using traceback.print_exc().\r
35\r
36 If the executed function calls interrupt_main the KeyboardInterrupt will be\r
37 raised when the function returns.\r
38\r
39 """\r
40 if type(args) != type(tuple()):\r
41 raise TypeError("2nd arg must be a tuple")\r
42 if type(kwargs) != type(dict()):\r
43 raise TypeError("3rd arg must be a dict")\r
44 global _main\r
45 _main = False\r
46 try:\r
47 function(*args, **kwargs)\r
48 except SystemExit:\r
49 pass\r
50 except:\r
51 _traceback.print_exc()\r
52 _main = True\r
53 global _interrupt\r
54 if _interrupt:\r
55 _interrupt = False\r
56 raise KeyboardInterrupt\r
57\r
58def exit():\r
59 """Dummy implementation of thread.exit()."""\r
60 raise SystemExit\r
61\r
62def get_ident():\r
63 """Dummy implementation of thread.get_ident().\r
64\r
65 Since this module should only be used when threadmodule is not\r
66 available, it is safe to assume that the current process is the\r
67 only thread. Thus a constant can be safely returned.\r
68 """\r
69 return -1\r
70\r
71def allocate_lock():\r
72 """Dummy implementation of thread.allocate_lock()."""\r
73 return LockType()\r
74\r
75def stack_size(size=None):\r
76 """Dummy implementation of thread.stack_size()."""\r
77 if size is not None:\r
78 raise error("setting thread stack size not supported")\r
79 return 0\r
80\r
81class LockType(object):\r
82 """Class implementing dummy implementation of thread.LockType.\r
83\r
84 Compatibility is maintained by maintaining self.locked_status\r
85 which is a boolean that stores the state of the lock. Pickling of\r
86 the lock, though, should not be done since if the thread module is\r
87 then used with an unpickled ``lock()`` from here problems could\r
88 occur from this class not having atomic methods.\r
89\r
90 """\r
91\r
92 def __init__(self):\r
93 self.locked_status = False\r
94\r
95 def acquire(self, waitflag=None):\r
96 """Dummy implementation of acquire().\r
97\r
98 For blocking calls, self.locked_status is automatically set to\r
99 True and returned appropriately based on value of\r
100 ``waitflag``. If it is non-blocking, then the value is\r
101 actually checked and not set if it is already acquired. This\r
102 is all done so that threading.Condition's assert statements\r
103 aren't triggered and throw a little fit.\r
104\r
105 """\r
106 if waitflag is None or waitflag:\r
107 self.locked_status = True\r
108 return True\r
109 else:\r
110 if not self.locked_status:\r
111 self.locked_status = True\r
112 return True\r
113 else:\r
114 return False\r
115\r
116 __enter__ = acquire\r
117\r
118 def __exit__(self, typ, val, tb):\r
119 self.release()\r
120\r
121 def release(self):\r
122 """Release the dummy lock."""\r
123 # XXX Perhaps shouldn't actually bother to test? Could lead\r
124 # to problems for complex, threaded code.\r
125 if not self.locked_status:\r
126 raise error\r
127 self.locked_status = False\r
128 return True\r
129\r
130 def locked(self):\r
131 return self.locked_status\r
132\r
133# Used to signal that interrupt_main was called in a "thread"\r
134_interrupt = False\r
135# True when not executing in a "thread"\r
136_main = True\r
137\r
138def interrupt_main():\r
139 """Set _interrupt flag to True to have start_new_thread raise\r
140 KeyboardInterrupt upon exiting."""\r
141 if _main:\r
142 raise KeyboardInterrupt\r
143 else:\r
144 global _interrupt\r
145 _interrupt = True\r