]>
git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_telnetlib.py
6 from unittest
import TestCase
7 from test
import test_support
8 threading
= test_support
.import_module('threading')
10 HOST
= test_support
.HOST
13 def server(evt
, serv
, dataq
=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] if is not False, write the list of data from dataq.get()
18 3) set evt to true to let the parent know we're done
23 conn
, addr
= serv
.accept()
26 new_data
= dataq
.get(True, 0.5)
31 if type(item
) in [int, float]:
35 written
= conn
.send(data
)
37 except socket
.timeout
:
45 class GeneralTests(TestCase
):
48 self
.evt
= threading
.Event()
49 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
50 self
.sock
.settimeout(3)
51 self
.port
= test_support
.bind_port(self
.sock
)
52 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
))
64 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
67 def testTimeoutDefault(self
):
68 self
.assertTrue(socket
.getdefaulttimeout() is None)
69 socket
.setdefaulttimeout(30)
71 telnet
= telnetlib
.Telnet("localhost", self
.port
)
73 socket
.setdefaulttimeout(None)
74 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
77 def testTimeoutNone(self
):
78 # None, having other default
79 self
.assertTrue(socket
.getdefaulttimeout() is None)
80 socket
.setdefaulttimeout(30)
82 telnet
= telnetlib
.Telnet(HOST
, self
.port
, timeout
=None)
84 socket
.setdefaulttimeout(None)
85 self
.assertTrue(telnet
.sock
.gettimeout() is None)
88 def testTimeoutValue(self
):
89 telnet
= telnetlib
.Telnet("localhost", self
.port
, timeout
=30)
90 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
93 def testTimeoutOpen(self
):
94 telnet
= telnetlib
.Telnet()
95 telnet
.open("localhost", self
.port
, timeout
=30)
96 self
.assertEqual(telnet
.sock
.gettimeout(), 30)
99 def _read_setUp(self
):
100 self
.evt
= threading
.Event()
101 self
.dataq
= Queue
.Queue()
102 self
.sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
103 self
.sock
.settimeout(3)
104 self
.port
= test_support
.bind_port(self
.sock
)
105 self
.thread
= threading
.Thread(target
=server
, args
=(self
.evt
,self
.sock
, self
.dataq
))
111 def _read_tearDown(self
):
115 class ReadTests(TestCase
):
117 tearDown
= _read_tearDown
119 # use a similar approach to testing timeouts as test_timeout.py
120 # these will never pass 100% but make the fuzz big enough that it is rare
123 def test_read_until_A(self
):
125 read_until(expected, [timeout])
126 Read until the expected string has been seen, or a timeout is
127 hit (default is no timeout); may block.
129 want
= ['x' * 10, 'match', 'y' * 10, EOF_sigil
]
131 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
133 data
= telnet
.read_until('match')
134 self
.assertEqual(data
, ''.join(want
[:-2]))
136 def test_read_until_B(self
):
137 # test the timeout - it does NOT raise socket.timeout
138 want
= ['hello', self
.block_long
, 'not seen', EOF_sigil
]
140 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
142 data
= telnet
.read_until('not seen', self
.block_short
)
143 self
.assertEqual(data
, want
[0])
144 self
.assertEqual(telnet
.read_all(), 'not seen')
146 def test_read_all_A(self
):
149 Read all data until EOF; may block.
151 want
= ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil
]
153 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
155 data
= telnet
.read_all()
156 self
.assertEqual(data
, ''.join(want
[:-1]))
159 def _test_blocking(self
, func
):
160 self
.dataq
.put([self
.block_long
, EOF_sigil
])
164 self
.assertTrue(self
.block_short
<= time
.time() - start
)
166 def test_read_all_B(self
):
167 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_all
)
169 def test_read_all_C(self
):
170 self
.dataq
.put([EOF_sigil
])
171 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
174 telnet
.read_all() # shouldn't raise
176 def test_read_some_A(self
):
179 Read at least one byte or EOF; may block.
181 # test 'at least one byte'
182 want
= ['x' * 500, EOF_sigil
]
184 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
186 data
= telnet
.read_all()
187 self
.assertTrue(len(data
) >= 1)
189 def test_read_some_B(self
):
191 self
.dataq
.put([EOF_sigil
])
192 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
194 self
.assertEqual('', telnet
.read_some())
196 def test_read_some_C(self
):
197 self
._test
_blocking
(telnetlib
.Telnet(HOST
, self
.port
).read_some
)
199 def _test_read_any_eager_A(self
, func_name
):
202 Read all data available already queued or on the socket,
205 want
= [self
.block_long
, 'x' * 100, 'y' * 100, EOF_sigil
]
206 expects
= want
[1] + want
[2]
208 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
210 func
= getattr(telnet
, func_name
)
215 self
.assertTrue(expects
.startswith(data
))
218 self
.assertEqual(expects
, data
)
220 def _test_read_any_eager_B(self
, func_name
):
222 self
.dataq
.put([EOF_sigil
])
223 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
225 time
.sleep(self
.block_short
)
226 func
= getattr(telnet
, func_name
)
227 self
.assertRaises(EOFError, func
)
229 # read_eager and read_very_eager make the same gaurantees
230 # (they behave differently but we only test the gaurantees)
231 def test_read_very_eager_A(self
):
232 self
._test
_read
_any
_eager
_A
('read_very_eager')
233 def test_read_very_eager_B(self
):
234 self
._test
_read
_any
_eager
_B
('read_very_eager')
235 def test_read_eager_A(self
):
236 self
._test
_read
_any
_eager
_A
('read_eager')
237 def test_read_eager_B(self
):
238 self
._test
_read
_any
_eager
_B
('read_eager')
239 # NB -- we need to test the IAC block which is mentioned in the docstring
240 # but not in the module docs
242 def _test_read_any_lazy_B(self
, func_name
):
243 self
.dataq
.put([EOF_sigil
])
244 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
246 func
= getattr(telnet
, func_name
)
248 self
.assertRaises(EOFError, func
)
250 def test_read_lazy_A(self
):
251 want
= ['x' * 100, EOF_sigil
]
253 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
255 time
.sleep(self
.block_short
)
256 self
.assertEqual('', telnet
.read_lazy())
260 read_data
= telnet
.read_lazy()
266 self
.assertTrue(want
[0].startswith(data
))
267 self
.assertEqual(data
, want
[0])
269 def test_read_lazy_B(self
):
270 self
._test
_read
_any
_lazy
_B
('read_lazy')
272 def test_read_very_lazy_A(self
):
273 want
= ['x' * 100, EOF_sigil
]
275 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
277 time
.sleep(self
.block_short
)
278 self
.assertEqual('', telnet
.read_very_lazy())
282 read_data
= telnet
.read_very_lazy()
288 self
.assertEqual('', telnet
.cookedq
)
289 telnet
.process_rawq()
290 self
.assertTrue(want
[0].startswith(data
))
291 self
.assertEqual(data
, want
[0])
293 def test_read_very_lazy_B(self
):
294 self
._test
_read
_any
_lazy
_B
('read_very_lazy')
296 class nego_collector(object):
297 def __init__(self
, sb_getter
=None):
299 self
.sb_getter
= sb_getter
302 def do_nego(self
, sock
, cmd
, opt
):
303 self
.seen
+= cmd
+ opt
304 if cmd
== tl
.SE
and self
.sb_getter
:
305 sb_data
= self
.sb_getter()
306 self
.sb_seen
+= sb_data
309 class OptionTests(TestCase
):
311 tearDown
= _read_tearDown
313 cmds
= [tl
.AO
, tl
.AYT
, tl
.BRK
, tl
.EC
, tl
.EL
, tl
.GA
, tl
.IP
, tl
.NOP
]
315 def _test_command(self
, data
):
316 """ helper for testing IAC + cmd """
319 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
321 nego
= nego_collector()
322 telnet
.set_option_negotiation_callback(nego
.do_nego
)
323 txt
= telnet
.read_all()
325 self
.assertTrue(len(cmd
) > 0) # we expect at least one command
326 self
.assertIn(cmd
[0], self
.cmds
)
327 self
.assertEqual(cmd
[1], tl
.NOOPT
)
328 self
.assertEqual(len(''.join(data
[:-1])), len(txt
+ cmd
))
329 nego
.sb_getter
= None # break the nego => telnet cycle
332 def test_IAC_commands(self
):
334 self
.dataq
.put([EOF_sigil
])
335 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
339 for cmd
in self
.cmds
:
340 self
._test
_command
(['x' * 100, tl
.IAC
+ cmd
, 'y'*100, EOF_sigil
])
341 self
._test
_command
(['x' * 10, tl
.IAC
+ cmd
, 'y'*10, EOF_sigil
])
342 self
._test
_command
([tl
.IAC
+ cmd
, EOF_sigil
])
344 self
._test
_command
([tl
.IAC
+ cmd
for (cmd
) in self
.cmds
] + [EOF_sigil
])
345 self
.assertEqual('', telnet
.read_sb_data())
347 def test_SB_commands(self
):
348 # RFC 855, subnegotiations portion
349 send
= [tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.SE
,
350 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
351 tl
.IAC
+ tl
.SB
+ tl
.IAC
+ tl
.IAC
+ 'aa' + tl
.IAC
+ tl
.SE
,
352 tl
.IAC
+ tl
.SB
+ 'bb' + tl
.IAC
+ tl
.IAC
+ tl
.IAC
+ tl
.SE
,
353 tl
.IAC
+ tl
.SB
+ 'cc' + tl
.IAC
+ tl
.IAC
+ 'dd' + tl
.IAC
+ tl
.SE
,
357 telnet
= telnetlib
.Telnet(HOST
, self
.port
)
359 nego
= nego_collector(telnet
.read_sb_data
)
360 telnet
.set_option_negotiation_callback(nego
.do_nego
)
361 txt
= telnet
.read_all()
362 self
.assertEqual(txt
, '')
363 want_sb_data
= tl
.IAC
+ tl
.IAC
+ 'aabb' + tl
.IAC
+ 'cc' + tl
.IAC
+ 'dd'
364 self
.assertEqual(nego
.sb_seen
, want_sb_data
)
365 self
.assertEqual('', telnet
.read_sb_data())
366 nego
.sb_getter
= None # break the nego => telnet cycle
368 def test_main(verbose
=None):
369 test_support
.run_unittest(GeneralTests
, ReadTests
, OptionTests
)
371 if __name__
== '__main__':