]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | """PyUnit testing that threads honor our signal semantics"""\r |
2 | \r | |
3 | import unittest\r | |
4 | import signal\r | |
5 | import os\r | |
6 | import sys\r | |
7 | from test.test_support import run_unittest, import_module, reap_threads\r | |
8 | thread = import_module('thread')\r | |
9 | \r | |
10 | if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':\r | |
11 | raise unittest.SkipTest, "Can't test signal on %s" % sys.platform\r | |
12 | \r | |
13 | process_pid = os.getpid()\r | |
14 | signalled_all=thread.allocate_lock()\r | |
15 | \r | |
16 | \r | |
17 | def registerSignals(for_usr1, for_usr2, for_alrm):\r | |
18 | usr1 = signal.signal(signal.SIGUSR1, for_usr1)\r | |
19 | usr2 = signal.signal(signal.SIGUSR2, for_usr2)\r | |
20 | alrm = signal.signal(signal.SIGALRM, for_alrm)\r | |
21 | return usr1, usr2, alrm\r | |
22 | \r | |
23 | \r | |
24 | # The signal handler. Just note that the signal occurred and\r | |
25 | # from who.\r | |
26 | def handle_signals(sig,frame):\r | |
27 | signal_blackboard[sig]['tripped'] += 1\r | |
28 | signal_blackboard[sig]['tripped_by'] = thread.get_ident()\r | |
29 | \r | |
30 | # a function that will be spawned as a separate thread.\r | |
31 | def send_signals():\r | |
32 | os.kill(process_pid, signal.SIGUSR1)\r | |
33 | os.kill(process_pid, signal.SIGUSR2)\r | |
34 | signalled_all.release()\r | |
35 | \r | |
36 | class ThreadSignals(unittest.TestCase):\r | |
37 | """Test signal handling semantics of threads.\r | |
38 | We spawn a thread, have the thread send two signals, and\r | |
39 | wait for it to finish. Check that we got both signals\r | |
40 | and that they were run by the main thread.\r | |
41 | """\r | |
42 | @reap_threads\r | |
43 | def test_signals(self):\r | |
44 | signalled_all.acquire()\r | |
45 | self.spawnSignallingThread()\r | |
46 | signalled_all.acquire()\r | |
47 | # the signals that we asked the kernel to send\r | |
48 | # will come back, but we don't know when.\r | |
49 | # (it might even be after the thread exits\r | |
50 | # and might be out of order.) If we haven't seen\r | |
51 | # the signals yet, send yet another signal and\r | |
52 | # wait for it return.\r | |
53 | if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \\r | |
54 | or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:\r | |
55 | signal.alarm(1)\r | |
56 | signal.pause()\r | |
57 | signal.alarm(0)\r | |
58 | \r | |
59 | self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)\r | |
60 | self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],\r | |
61 | thread.get_ident())\r | |
62 | self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)\r | |
63 | self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],\r | |
64 | thread.get_ident())\r | |
65 | signalled_all.release()\r | |
66 | \r | |
67 | def spawnSignallingThread(self):\r | |
68 | thread.start_new_thread(send_signals, ())\r | |
69 | \r | |
70 | \r | |
71 | def test_main():\r | |
72 | global signal_blackboard\r | |
73 | \r | |
74 | signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },\r | |
75 | signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },\r | |
76 | signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }\r | |
77 | \r | |
78 | oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)\r | |
79 | try:\r | |
80 | run_unittest(ThreadSignals)\r | |
81 | finally:\r | |
82 | registerSignals(*oldsigs)\r | |
83 | \r | |
84 | if __name__ == '__main__':\r | |
85 | test_main()\r |