+++ /dev/null
-import socket\r
-import telnetlib\r
-import time\r
-import Queue\r
-\r
-from unittest import TestCase\r
-from test import test_support\r
-threading = test_support.import_module('threading')\r
-\r
-HOST = test_support.HOST\r
-EOF_sigil = object()\r
-\r
-def server(evt, serv, dataq=None):\r
- """ Open a tcp server in three steps\r
- 1) set evt to true to let the parent know we are ready\r
- 2) [optional] if is not False, write the list of data from dataq.get()\r
- to the socket.\r
- 3) set evt to true to let the parent know we're done\r
- """\r
- serv.listen(5)\r
- evt.set()\r
- try:\r
- conn, addr = serv.accept()\r
- if dataq:\r
- data = ''\r
- new_data = dataq.get(True, 0.5)\r
- dataq.task_done()\r
- for item in new_data:\r
- if item == EOF_sigil:\r
- break\r
- if type(item) in [int, float]:\r
- time.sleep(item)\r
- else:\r
- data += item\r
- written = conn.send(data)\r
- data = data[written:]\r
- except socket.timeout:\r
- pass\r
- else:\r
- conn.close()\r
- finally:\r
- serv.close()\r
- evt.set()\r
-\r
-class GeneralTests(TestCase):\r
-\r
- def setUp(self):\r
- self.evt = threading.Event()\r
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
- self.sock.settimeout(3)\r
- self.port = test_support.bind_port(self.sock)\r
- self.thread = threading.Thread(target=server, args=(self.evt,self.sock))\r
- self.thread.start()\r
- self.evt.wait()\r
- self.evt.clear()\r
- time.sleep(.1)\r
-\r
- def tearDown(self):\r
- self.evt.wait()\r
- self.thread.join()\r
-\r
- def testBasic(self):\r
- # connects\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- telnet.sock.close()\r
-\r
- def testTimeoutDefault(self):\r
- self.assertTrue(socket.getdefaulttimeout() is None)\r
- socket.setdefaulttimeout(30)\r
- try:\r
- telnet = telnetlib.Telnet("localhost", self.port)\r
- finally:\r
- socket.setdefaulttimeout(None)\r
- self.assertEqual(telnet.sock.gettimeout(), 30)\r
- telnet.sock.close()\r
-\r
- def testTimeoutNone(self):\r
- # None, having other default\r
- self.assertTrue(socket.getdefaulttimeout() is None)\r
- socket.setdefaulttimeout(30)\r
- try:\r
- telnet = telnetlib.Telnet(HOST, self.port, timeout=None)\r
- finally:\r
- socket.setdefaulttimeout(None)\r
- self.assertTrue(telnet.sock.gettimeout() is None)\r
- telnet.sock.close()\r
-\r
- def testTimeoutValue(self):\r
- telnet = telnetlib.Telnet("localhost", self.port, timeout=30)\r
- self.assertEqual(telnet.sock.gettimeout(), 30)\r
- telnet.sock.close()\r
-\r
- def testTimeoutOpen(self):\r
- telnet = telnetlib.Telnet()\r
- telnet.open("localhost", self.port, timeout=30)\r
- self.assertEqual(telnet.sock.gettimeout(), 30)\r
- telnet.sock.close()\r
-\r
-def _read_setUp(self):\r
- self.evt = threading.Event()\r
- self.dataq = Queue.Queue()\r
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
- self.sock.settimeout(3)\r
- self.port = test_support.bind_port(self.sock)\r
- self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))\r
- self.thread.start()\r
- self.evt.wait()\r
- self.evt.clear()\r
- time.sleep(.1)\r
-\r
-def _read_tearDown(self):\r
- self.evt.wait()\r
- self.thread.join()\r
-\r
-class ReadTests(TestCase):\r
- setUp = _read_setUp\r
- tearDown = _read_tearDown\r
-\r
- # use a similar approach to testing timeouts as test_timeout.py\r
- # these will never pass 100% but make the fuzz big enough that it is rare\r
- block_long = 0.6\r
- block_short = 0.3\r
- def test_read_until_A(self):\r
- """\r
- read_until(expected, [timeout])\r
- Read until the expected string has been seen, or a timeout is\r
- hit (default is no timeout); may block.\r
- """\r
- want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- data = telnet.read_until('match')\r
- self.assertEqual(data, ''.join(want[:-2]))\r
-\r
- def test_read_until_B(self):\r
- # test the timeout - it does NOT raise socket.timeout\r
- want = ['hello', self.block_long, 'not seen', EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- data = telnet.read_until('not seen', self.block_short)\r
- self.assertEqual(data, want[0])\r
- self.assertEqual(telnet.read_all(), 'not seen')\r
-\r
- def test_read_all_A(self):\r
- """\r
- read_all()\r
- Read all data until EOF; may block.\r
- """\r
- want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- data = telnet.read_all()\r
- self.assertEqual(data, ''.join(want[:-1]))\r
- return\r
-\r
- def _test_blocking(self, func):\r
- self.dataq.put([self.block_long, EOF_sigil])\r
- self.dataq.join()\r
- start = time.time()\r
- data = func()\r
- self.assertTrue(self.block_short <= time.time() - start)\r
-\r
- def test_read_all_B(self):\r
- self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)\r
-\r
- def test_read_all_C(self):\r
- self.dataq.put([EOF_sigil])\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- telnet.read_all()\r
- telnet.read_all() # shouldn't raise\r
-\r
- def test_read_some_A(self):\r
- """\r
- read_some()\r
- Read at least one byte or EOF; may block.\r
- """\r
- # test 'at least one byte'\r
- want = ['x' * 500, EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- data = telnet.read_all()\r
- self.assertTrue(len(data) >= 1)\r
-\r
- def test_read_some_B(self):\r
- # test EOF\r
- self.dataq.put([EOF_sigil])\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- self.assertEqual('', telnet.read_some())\r
-\r
- def test_read_some_C(self):\r
- self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)\r
-\r
- def _test_read_any_eager_A(self, func_name):\r
- """\r
- read_very_eager()\r
- Read all data available already queued or on the socket,\r
- without blocking.\r
- """\r
- want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]\r
- expects = want[1] + want[2]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- func = getattr(telnet, func_name)\r
- data = ''\r
- while True:\r
- try:\r
- data += func()\r
- self.assertTrue(expects.startswith(data))\r
- except EOFError:\r
- break\r
- self.assertEqual(expects, data)\r
-\r
- def _test_read_any_eager_B(self, func_name):\r
- # test EOF\r
- self.dataq.put([EOF_sigil])\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- time.sleep(self.block_short)\r
- func = getattr(telnet, func_name)\r
- self.assertRaises(EOFError, func)\r
-\r
- # read_eager and read_very_eager make the same gaurantees\r
- # (they behave differently but we only test the gaurantees)\r
- def test_read_very_eager_A(self):\r
- self._test_read_any_eager_A('read_very_eager')\r
- def test_read_very_eager_B(self):\r
- self._test_read_any_eager_B('read_very_eager')\r
- def test_read_eager_A(self):\r
- self._test_read_any_eager_A('read_eager')\r
- def test_read_eager_B(self):\r
- self._test_read_any_eager_B('read_eager')\r
- # NB -- we need to test the IAC block which is mentioned in the docstring\r
- # but not in the module docs\r
-\r
- def _test_read_any_lazy_B(self, func_name):\r
- self.dataq.put([EOF_sigil])\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- func = getattr(telnet, func_name)\r
- telnet.fill_rawq()\r
- self.assertRaises(EOFError, func)\r
-\r
- def test_read_lazy_A(self):\r
- want = ['x' * 100, EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- time.sleep(self.block_short)\r
- self.assertEqual('', telnet.read_lazy())\r
- data = ''\r
- while True:\r
- try:\r
- read_data = telnet.read_lazy()\r
- data += read_data\r
- if not read_data:\r
- telnet.fill_rawq()\r
- except EOFError:\r
- break\r
- self.assertTrue(want[0].startswith(data))\r
- self.assertEqual(data, want[0])\r
-\r
- def test_read_lazy_B(self):\r
- self._test_read_any_lazy_B('read_lazy')\r
-\r
- def test_read_very_lazy_A(self):\r
- want = ['x' * 100, EOF_sigil]\r
- self.dataq.put(want)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- time.sleep(self.block_short)\r
- self.assertEqual('', telnet.read_very_lazy())\r
- data = ''\r
- while True:\r
- try:\r
- read_data = telnet.read_very_lazy()\r
- except EOFError:\r
- break\r
- data += read_data\r
- if not read_data:\r
- telnet.fill_rawq()\r
- self.assertEqual('', telnet.cookedq)\r
- telnet.process_rawq()\r
- self.assertTrue(want[0].startswith(data))\r
- self.assertEqual(data, want[0])\r
-\r
- def test_read_very_lazy_B(self):\r
- self._test_read_any_lazy_B('read_very_lazy')\r
-\r
-class nego_collector(object):\r
- def __init__(self, sb_getter=None):\r
- self.seen = ''\r
- self.sb_getter = sb_getter\r
- self.sb_seen = ''\r
-\r
- def do_nego(self, sock, cmd, opt):\r
- self.seen += cmd + opt\r
- if cmd == tl.SE and self.sb_getter:\r
- sb_data = self.sb_getter()\r
- self.sb_seen += sb_data\r
-\r
-tl = telnetlib\r
-class OptionTests(TestCase):\r
- setUp = _read_setUp\r
- tearDown = _read_tearDown\r
- # RFC 854 commands\r
- cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]\r
-\r
- def _test_command(self, data):\r
- """ helper for testing IAC + cmd """\r
- self.setUp()\r
- self.dataq.put(data)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- nego = nego_collector()\r
- telnet.set_option_negotiation_callback(nego.do_nego)\r
- txt = telnet.read_all()\r
- cmd = nego.seen\r
- self.assertTrue(len(cmd) > 0) # we expect at least one command\r
- self.assertIn(cmd[0], self.cmds)\r
- self.assertEqual(cmd[1], tl.NOOPT)\r
- self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))\r
- nego.sb_getter = None # break the nego => telnet cycle\r
- self.tearDown()\r
-\r
- def test_IAC_commands(self):\r
- # reset our setup\r
- self.dataq.put([EOF_sigil])\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- self.tearDown()\r
-\r
- for cmd in self.cmds:\r
- self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])\r
- self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])\r
- self._test_command([tl.IAC + cmd, EOF_sigil])\r
- # all at once\r
- self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])\r
- self.assertEqual('', telnet.read_sb_data())\r
-\r
- def test_SB_commands(self):\r
- # RFC 855, subnegotiations portion\r
- send = [tl.IAC + tl.SB + tl.IAC + tl.SE,\r
- tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,\r
- tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,\r
- tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,\r
- tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,\r
- EOF_sigil,\r
- ]\r
- self.dataq.put(send)\r
- telnet = telnetlib.Telnet(HOST, self.port)\r
- self.dataq.join()\r
- nego = nego_collector(telnet.read_sb_data)\r
- telnet.set_option_negotiation_callback(nego.do_nego)\r
- txt = telnet.read_all()\r
- self.assertEqual(txt, '')\r
- want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'\r
- self.assertEqual(nego.sb_seen, want_sb_data)\r
- self.assertEqual('', telnet.read_sb_data())\r
- nego.sb_getter = None # break the nego => telnet cycle\r
-\r
-def test_main(verbose=None):\r
- test_support.run_unittest(GeneralTests, ReadTests, OptionTests)\r
-\r
-if __name__ == '__main__':\r
- test_main()\r