]> git.proxmox.com Git - mirror_edk2.git/blob - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_telnetlib.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_telnetlib.py
1 import socket
2 import telnetlib
3 import time
4 import Queue
5
6 from unittest import TestCase
7 from test import test_support
8 threading = test_support.import_module('threading')
9
10 HOST = test_support.HOST
11 EOF_sigil = object()
12
13 def server(evt, serv, dataq=None):
14 """ Open a tcp server in three steps
15 1) set evt to true to let the parent know we are ready
16 2) [optional] if is not False, write the list of data from dataq.get()
17 to the socket.
18 3) set evt to true to let the parent know we're done
19 """
20 serv.listen(5)
21 evt.set()
22 try:
23 conn, addr = serv.accept()
24 if dataq:
25 data = ''
26 new_data = dataq.get(True, 0.5)
27 dataq.task_done()
28 for item in new_data:
29 if item == EOF_sigil:
30 break
31 if type(item) in [int, float]:
32 time.sleep(item)
33 else:
34 data += item
35 written = conn.send(data)
36 data = data[written:]
37 except socket.timeout:
38 pass
39 else:
40 conn.close()
41 finally:
42 serv.close()
43 evt.set()
44
45 class GeneralTests(TestCase):
46
47 def setUp(self):
48 self.evt = threading.Event()
49 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
50 self.sock.settimeout(3)
51 self.port = test_support.bind_port(self.sock)
52 self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
53 self.thread.start()
54 self.evt.wait()
55 self.evt.clear()
56 time.sleep(.1)
57
58 def tearDown(self):
59 self.evt.wait()
60 self.thread.join()
61
62 def testBasic(self):
63 # connects
64 telnet = telnetlib.Telnet(HOST, self.port)
65 telnet.sock.close()
66
67 def testTimeoutDefault(self):
68 self.assertTrue(socket.getdefaulttimeout() is None)
69 socket.setdefaulttimeout(30)
70 try:
71 telnet = telnetlib.Telnet("localhost", self.port)
72 finally:
73 socket.setdefaulttimeout(None)
74 self.assertEqual(telnet.sock.gettimeout(), 30)
75 telnet.sock.close()
76
77 def testTimeoutNone(self):
78 # None, having other default
79 self.assertTrue(socket.getdefaulttimeout() is None)
80 socket.setdefaulttimeout(30)
81 try:
82 telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
83 finally:
84 socket.setdefaulttimeout(None)
85 self.assertTrue(telnet.sock.gettimeout() is None)
86 telnet.sock.close()
87
88 def testTimeoutValue(self):
89 telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
90 self.assertEqual(telnet.sock.gettimeout(), 30)
91 telnet.sock.close()
92
93 def testTimeoutOpen(self):
94 telnet = telnetlib.Telnet()
95 telnet.open("localhost", self.port, timeout=30)
96 self.assertEqual(telnet.sock.gettimeout(), 30)
97 telnet.sock.close()
98
99 def _read_setUp(self):
100 self.evt = threading.Event()
101 self.dataq = Queue.Queue()
102 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
103 self.sock.settimeout(3)
104 self.port = test_support.bind_port(self.sock)
105 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
106 self.thread.start()
107 self.evt.wait()
108 self.evt.clear()
109 time.sleep(.1)
110
111 def _read_tearDown(self):
112 self.evt.wait()
113 self.thread.join()
114
115 class ReadTests(TestCase):
116 setUp = _read_setUp
117 tearDown = _read_tearDown
118
119 # use a similar approach to testing timeouts as test_timeout.py
120 # these will never pass 100% but make the fuzz big enough that it is rare
121 block_long = 0.6
122 block_short = 0.3
123 def test_read_until_A(self):
124 """
125 read_until(expected, [timeout])
126 Read until the expected string has been seen, or a timeout is
127 hit (default is no timeout); may block.
128 """
129 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
130 self.dataq.put(want)
131 telnet = telnetlib.Telnet(HOST, self.port)
132 self.dataq.join()
133 data = telnet.read_until('match')
134 self.assertEqual(data, ''.join(want[:-2]))
135
136 def test_read_until_B(self):
137 # test the timeout - it does NOT raise socket.timeout
138 want = ['hello', self.block_long, 'not seen', EOF_sigil]
139 self.dataq.put(want)
140 telnet = telnetlib.Telnet(HOST, self.port)
141 self.dataq.join()
142 data = telnet.read_until('not seen', self.block_short)
143 self.assertEqual(data, want[0])
144 self.assertEqual(telnet.read_all(), 'not seen')
145
146 def test_read_all_A(self):
147 """
148 read_all()
149 Read all data until EOF; may block.
150 """
151 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
152 self.dataq.put(want)
153 telnet = telnetlib.Telnet(HOST, self.port)
154 self.dataq.join()
155 data = telnet.read_all()
156 self.assertEqual(data, ''.join(want[:-1]))
157 return
158
159 def _test_blocking(self, func):
160 self.dataq.put([self.block_long, EOF_sigil])
161 self.dataq.join()
162 start = time.time()
163 data = func()
164 self.assertTrue(self.block_short <= time.time() - start)
165
166 def test_read_all_B(self):
167 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
168
169 def test_read_all_C(self):
170 self.dataq.put([EOF_sigil])
171 telnet = telnetlib.Telnet(HOST, self.port)
172 self.dataq.join()
173 telnet.read_all()
174 telnet.read_all() # shouldn't raise
175
176 def test_read_some_A(self):
177 """
178 read_some()
179 Read at least one byte or EOF; may block.
180 """
181 # test 'at least one byte'
182 want = ['x' * 500, EOF_sigil]
183 self.dataq.put(want)
184 telnet = telnetlib.Telnet(HOST, self.port)
185 self.dataq.join()
186 data = telnet.read_all()
187 self.assertTrue(len(data) >= 1)
188
189 def test_read_some_B(self):
190 # test EOF
191 self.dataq.put([EOF_sigil])
192 telnet = telnetlib.Telnet(HOST, self.port)
193 self.dataq.join()
194 self.assertEqual('', telnet.read_some())
195
196 def test_read_some_C(self):
197 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
198
199 def _test_read_any_eager_A(self, func_name):
200 """
201 read_very_eager()
202 Read all data available already queued or on the socket,
203 without blocking.
204 """
205 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
206 expects = want[1] + want[2]
207 self.dataq.put(want)
208 telnet = telnetlib.Telnet(HOST, self.port)
209 self.dataq.join()
210 func = getattr(telnet, func_name)
211 data = ''
212 while True:
213 try:
214 data += func()
215 self.assertTrue(expects.startswith(data))
216 except EOFError:
217 break
218 self.assertEqual(expects, data)
219
220 def _test_read_any_eager_B(self, func_name):
221 # test EOF
222 self.dataq.put([EOF_sigil])
223 telnet = telnetlib.Telnet(HOST, self.port)
224 self.dataq.join()
225 time.sleep(self.block_short)
226 func = getattr(telnet, func_name)
227 self.assertRaises(EOFError, func)
228
229 # read_eager and read_very_eager make the same gaurantees
230 # (they behave differently but we only test the gaurantees)
231 def test_read_very_eager_A(self):
232 self._test_read_any_eager_A('read_very_eager')
233 def test_read_very_eager_B(self):
234 self._test_read_any_eager_B('read_very_eager')
235 def test_read_eager_A(self):
236 self._test_read_any_eager_A('read_eager')
237 def test_read_eager_B(self):
238 self._test_read_any_eager_B('read_eager')
239 # NB -- we need to test the IAC block which is mentioned in the docstring
240 # but not in the module docs
241
242 def _test_read_any_lazy_B(self, func_name):
243 self.dataq.put([EOF_sigil])
244 telnet = telnetlib.Telnet(HOST, self.port)
245 self.dataq.join()
246 func = getattr(telnet, func_name)
247 telnet.fill_rawq()
248 self.assertRaises(EOFError, func)
249
250 def test_read_lazy_A(self):
251 want = ['x' * 100, EOF_sigil]
252 self.dataq.put(want)
253 telnet = telnetlib.Telnet(HOST, self.port)
254 self.dataq.join()
255 time.sleep(self.block_short)
256 self.assertEqual('', telnet.read_lazy())
257 data = ''
258 while True:
259 try:
260 read_data = telnet.read_lazy()
261 data += read_data
262 if not read_data:
263 telnet.fill_rawq()
264 except EOFError:
265 break
266 self.assertTrue(want[0].startswith(data))
267 self.assertEqual(data, want[0])
268
269 def test_read_lazy_B(self):
270 self._test_read_any_lazy_B('read_lazy')
271
272 def test_read_very_lazy_A(self):
273 want = ['x' * 100, EOF_sigil]
274 self.dataq.put(want)
275 telnet = telnetlib.Telnet(HOST, self.port)
276 self.dataq.join()
277 time.sleep(self.block_short)
278 self.assertEqual('', telnet.read_very_lazy())
279 data = ''
280 while True:
281 try:
282 read_data = telnet.read_very_lazy()
283 except EOFError:
284 break
285 data += read_data
286 if not read_data:
287 telnet.fill_rawq()
288 self.assertEqual('', telnet.cookedq)
289 telnet.process_rawq()
290 self.assertTrue(want[0].startswith(data))
291 self.assertEqual(data, want[0])
292
293 def test_read_very_lazy_B(self):
294 self._test_read_any_lazy_B('read_very_lazy')
295
296 class nego_collector(object):
297 def __init__(self, sb_getter=None):
298 self.seen = ''
299 self.sb_getter = sb_getter
300 self.sb_seen = ''
301
302 def do_nego(self, sock, cmd, opt):
303 self.seen += cmd + opt
304 if cmd == tl.SE and self.sb_getter:
305 sb_data = self.sb_getter()
306 self.sb_seen += sb_data
307
308 tl = telnetlib
309 class OptionTests(TestCase):
310 setUp = _read_setUp
311 tearDown = _read_tearDown
312 # RFC 854 commands
313 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
314
315 def _test_command(self, data):
316 """ helper for testing IAC + cmd """
317 self.setUp()
318 self.dataq.put(data)
319 telnet = telnetlib.Telnet(HOST, self.port)
320 self.dataq.join()
321 nego = nego_collector()
322 telnet.set_option_negotiation_callback(nego.do_nego)
323 txt = telnet.read_all()
324 cmd = nego.seen
325 self.assertTrue(len(cmd) > 0) # we expect at least one command
326 self.assertIn(cmd[0], self.cmds)
327 self.assertEqual(cmd[1], tl.NOOPT)
328 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
329 nego.sb_getter = None # break the nego => telnet cycle
330 self.tearDown()
331
332 def test_IAC_commands(self):
333 # reset our setup
334 self.dataq.put([EOF_sigil])
335 telnet = telnetlib.Telnet(HOST, self.port)
336 self.dataq.join()
337 self.tearDown()
338
339 for cmd in self.cmds:
340 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
341 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
342 self._test_command([tl.IAC + cmd, EOF_sigil])
343 # all at once
344 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
345 self.assertEqual('', telnet.read_sb_data())
346
347 def test_SB_commands(self):
348 # RFC 855, subnegotiations portion
349 send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
350 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
351 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
352 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
353 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
354 EOF_sigil,
355 ]
356 self.dataq.put(send)
357 telnet = telnetlib.Telnet(HOST, self.port)
358 self.dataq.join()
359 nego = nego_collector(telnet.read_sb_data)
360 telnet.set_option_negotiation_callback(nego.do_nego)
361 txt = telnet.read_all()
362 self.assertEqual(txt, '')
363 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
364 self.assertEqual(nego.sb_seen, want_sb_data)
365 self.assertEqual('', telnet.read_sb_data())
366 nego.sb_getter = None # break the nego => telnet cycle
367
368 def test_main(verbose=None):
369 test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
370
371 if __name__ == '__main__':
372 test_main()