]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_threadsignals.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_threadsignals.py
CommitLineData
4710c53d 1"""PyUnit testing that threads honor our signal semantics"""\r
2\r
3import unittest\r
4import signal\r
5import os\r
6import sys\r
7from test.test_support import run_unittest, import_module, reap_threads\r
8thread = import_module('thread')\r
9\r
10if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':\r
11 raise unittest.SkipTest, "Can't test signal on %s" % sys.platform\r
12\r
13process_pid = os.getpid()\r
14signalled_all=thread.allocate_lock()\r
15\r
16\r
17def registerSignals(for_usr1, for_usr2, for_alrm):\r
18 usr1 = signal.signal(signal.SIGUSR1, for_usr1)\r
19 usr2 = signal.signal(signal.SIGUSR2, for_usr2)\r
20 alrm = signal.signal(signal.SIGALRM, for_alrm)\r
21 return usr1, usr2, alrm\r
22\r
23\r
24# The signal handler. Just note that the signal occurred and\r
25# from who.\r
26def handle_signals(sig,frame):\r
27 signal_blackboard[sig]['tripped'] += 1\r
28 signal_blackboard[sig]['tripped_by'] = thread.get_ident()\r
29\r
30# a function that will be spawned as a separate thread.\r
31def send_signals():\r
32 os.kill(process_pid, signal.SIGUSR1)\r
33 os.kill(process_pid, signal.SIGUSR2)\r
34 signalled_all.release()\r
35\r
36class ThreadSignals(unittest.TestCase):\r
37 """Test signal handling semantics of threads.\r
38 We spawn a thread, have the thread send two signals, and\r
39 wait for it to finish. Check that we got both signals\r
40 and that they were run by the main thread.\r
41 """\r
42 @reap_threads\r
43 def test_signals(self):\r
44 signalled_all.acquire()\r
45 self.spawnSignallingThread()\r
46 signalled_all.acquire()\r
47 # the signals that we asked the kernel to send\r
48 # will come back, but we don't know when.\r
49 # (it might even be after the thread exits\r
50 # and might be out of order.) If we haven't seen\r
51 # the signals yet, send yet another signal and\r
52 # wait for it return.\r
53 if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \\r
54 or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:\r
55 signal.alarm(1)\r
56 signal.pause()\r
57 signal.alarm(0)\r
58\r
59 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)\r
60 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],\r
61 thread.get_ident())\r
62 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)\r
63 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],\r
64 thread.get_ident())\r
65 signalled_all.release()\r
66\r
67 def spawnSignallingThread(self):\r
68 thread.start_new_thread(send_signals, ())\r
69\r
70\r
71def test_main():\r
72 global signal_blackboard\r
73\r
74 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },\r
75 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },\r
76 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }\r
77\r
78 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)\r
79 try:\r
80 run_unittest(ThreadSignals)\r
81 finally:\r
82 registerSignals(*oldsigs)\r
83\r
84if __name__ == '__main__':\r
85 test_main()\r