+++ /dev/null
-import gc\r
-import os\r
-import sys\r
-import signal\r
-import weakref\r
-\r
-from cStringIO import StringIO\r
-\r
-\r
-import unittest\r
-\r
-\r
-@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")\r
-@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")\r
-@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "\r
- "if threads have been used")\r
-class TestBreak(unittest.TestCase):\r
-\r
- def setUp(self):\r
- self._default_handler = signal.getsignal(signal.SIGINT)\r
-\r
- def tearDown(self):\r
- signal.signal(signal.SIGINT, self._default_handler)\r
- unittest.signals._results = weakref.WeakKeyDictionary()\r
- unittest.signals._interrupt_handler = None\r
-\r
-\r
- def testInstallHandler(self):\r
- default_handler = signal.getsignal(signal.SIGINT)\r
- unittest.installHandler()\r
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- try:\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
- except KeyboardInterrupt:\r
- self.fail("KeyboardInterrupt not handled")\r
-\r
- self.assertTrue(unittest.signals._interrupt_handler.called)\r
-\r
- def testRegisterResult(self):\r
- result = unittest.TestResult()\r
- unittest.registerResult(result)\r
-\r
- for ref in unittest.signals._results:\r
- if ref is result:\r
- break\r
- elif ref is not result:\r
- self.fail("odd object in result set")\r
- else:\r
- self.fail("result not found")\r
-\r
-\r
- def testInterruptCaught(self):\r
- default_handler = signal.getsignal(signal.SIGINT)\r
-\r
- result = unittest.TestResult()\r
- unittest.installHandler()\r
- unittest.registerResult(result)\r
-\r
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- def test(result):\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
- result.breakCaught = True\r
- self.assertTrue(result.shouldStop)\r
-\r
- try:\r
- test(result)\r
- except KeyboardInterrupt:\r
- self.fail("KeyboardInterrupt not handled")\r
- self.assertTrue(result.breakCaught)\r
-\r
-\r
- def testSecondInterrupt(self):\r
- result = unittest.TestResult()\r
- unittest.installHandler()\r
- unittest.registerResult(result)\r
-\r
- def test(result):\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
- result.breakCaught = True\r
- self.assertTrue(result.shouldStop)\r
- os.kill(pid, signal.SIGINT)\r
- self.fail("Second KeyboardInterrupt not raised")\r
-\r
- try:\r
- test(result)\r
- except KeyboardInterrupt:\r
- pass\r
- else:\r
- self.fail("Second KeyboardInterrupt not raised")\r
- self.assertTrue(result.breakCaught)\r
-\r
-\r
- def testTwoResults(self):\r
- unittest.installHandler()\r
-\r
- result = unittest.TestResult()\r
- unittest.registerResult(result)\r
- new_handler = signal.getsignal(signal.SIGINT)\r
-\r
- result2 = unittest.TestResult()\r
- unittest.registerResult(result2)\r
- self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)\r
-\r
- result3 = unittest.TestResult()\r
-\r
- def test(result):\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
-\r
- try:\r
- test(result)\r
- except KeyboardInterrupt:\r
- self.fail("KeyboardInterrupt not handled")\r
-\r
- self.assertTrue(result.shouldStop)\r
- self.assertTrue(result2.shouldStop)\r
- self.assertFalse(result3.shouldStop)\r
-\r
-\r
- def testHandlerReplacedButCalled(self):\r
- # If our handler has been replaced (is no longer installed) but is\r
- # called by the *new* handler, then it isn't safe to delay the\r
- # SIGINT and we should immediately delegate to the default handler\r
- unittest.installHandler()\r
-\r
- handler = signal.getsignal(signal.SIGINT)\r
- def new_handler(frame, signum):\r
- handler(frame, signum)\r
- signal.signal(signal.SIGINT, new_handler)\r
-\r
- try:\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
- except KeyboardInterrupt:\r
- pass\r
- else:\r
- self.fail("replaced but delegated handler doesn't raise interrupt")\r
-\r
- def testRunner(self):\r
- # Creating a TextTestRunner with the appropriate argument should\r
- # register the TextTestResult it creates\r
- runner = unittest.TextTestRunner(stream=StringIO())\r
-\r
- result = runner.run(unittest.TestSuite())\r
- self.assertIn(result, unittest.signals._results)\r
-\r
- def testWeakReferences(self):\r
- # Calling registerResult on a result should not keep it alive\r
- result = unittest.TestResult()\r
- unittest.registerResult(result)\r
-\r
- ref = weakref.ref(result)\r
- del result\r
-\r
- # For non-reference counting implementations\r
- gc.collect();gc.collect()\r
- self.assertIsNone(ref())\r
-\r
-\r
- def testRemoveResult(self):\r
- result = unittest.TestResult()\r
- unittest.registerResult(result)\r
-\r
- unittest.installHandler()\r
- self.assertTrue(unittest.removeResult(result))\r
-\r
- # Should this raise an error instead?\r
- self.assertFalse(unittest.removeResult(unittest.TestResult()))\r
-\r
- try:\r
- pid = os.getpid()\r
- os.kill(pid, signal.SIGINT)\r
- except KeyboardInterrupt:\r
- pass\r
-\r
- self.assertFalse(result.shouldStop)\r
-\r
- def testMainInstallsHandler(self):\r
- failfast = object()\r
- test = object()\r
- verbosity = object()\r
- result = object()\r
- default_handler = signal.getsignal(signal.SIGINT)\r
-\r
- class FakeRunner(object):\r
- initArgs = []\r
- runArgs = []\r
- def __init__(self, *args, **kwargs):\r
- self.initArgs.append((args, kwargs))\r
- def run(self, test):\r
- self.runArgs.append(test)\r
- return result\r
-\r
- class Program(unittest.TestProgram):\r
- def __init__(self, catchbreak):\r
- self.exit = False\r
- self.verbosity = verbosity\r
- self.failfast = failfast\r
- self.catchbreak = catchbreak\r
- self.testRunner = FakeRunner\r
- self.test = test\r
- self.result = None\r
-\r
- p = Program(False)\r
- p.runTests()\r
-\r
- self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,\r
- 'verbosity': verbosity,\r
- 'failfast': failfast})])\r
- self.assertEqual(FakeRunner.runArgs, [test])\r
- self.assertEqual(p.result, result)\r
-\r
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- FakeRunner.initArgs = []\r
- FakeRunner.runArgs = []\r
- p = Program(True)\r
- p.runTests()\r
-\r
- self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,\r
- 'verbosity': verbosity,\r
- 'failfast': failfast})])\r
- self.assertEqual(FakeRunner.runArgs, [test])\r
- self.assertEqual(p.result, result)\r
-\r
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- def testRemoveHandler(self):\r
- default_handler = signal.getsignal(signal.SIGINT)\r
- unittest.installHandler()\r
- unittest.removeHandler()\r
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- # check that calling removeHandler multiple times has no ill-effect\r
- unittest.removeHandler()\r
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- def testRemoveHandlerAsDecorator(self):\r
- default_handler = signal.getsignal(signal.SIGINT)\r
- unittest.installHandler()\r
-\r
- @unittest.removeHandler\r
- def test():\r
- self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)\r
-\r
- test()\r
- self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)\r