]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_smtplib.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_smtplib.py
CommitLineData
4710c53d 1import asyncore\r
2import email.utils\r
3import socket\r
4import smtpd\r
5import smtplib\r
6import StringIO\r
7import sys\r
8import time\r
9import select\r
10\r
11import unittest\r
12from test import test_support\r
13\r
14try:\r
15 import threading\r
16except ImportError:\r
17 threading = None\r
18\r
19HOST = test_support.HOST\r
20\r
21def server(evt, buf, serv):\r
22 serv.listen(5)\r
23 evt.set()\r
24 try:\r
25 conn, addr = serv.accept()\r
26 except socket.timeout:\r
27 pass\r
28 else:\r
29 n = 500\r
30 while buf and n > 0:\r
31 r, w, e = select.select([], [conn], [])\r
32 if w:\r
33 sent = conn.send(buf)\r
34 buf = buf[sent:]\r
35\r
36 n -= 1\r
37\r
38 conn.close()\r
39 finally:\r
40 serv.close()\r
41 evt.set()\r
42\r
43@unittest.skipUnless(threading, 'Threading required for this test.')\r
44class GeneralTests(unittest.TestCase):\r
45\r
46 def setUp(self):\r
47 self._threads = test_support.threading_setup()\r
48 self.evt = threading.Event()\r
49 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
50 self.sock.settimeout(15)\r
51 self.port = test_support.bind_port(self.sock)\r
52 servargs = (self.evt, "220 Hola mundo\n", self.sock)\r
53 self.thread = threading.Thread(target=server, args=servargs)\r
54 self.thread.start()\r
55 self.evt.wait()\r
56 self.evt.clear()\r
57\r
58 def tearDown(self):\r
59 self.evt.wait()\r
60 self.thread.join()\r
61 test_support.threading_cleanup(*self._threads)\r
62\r
63 def testBasic1(self):\r
64 # connects\r
65 smtp = smtplib.SMTP(HOST, self.port)\r
66 smtp.close()\r
67\r
68 def testBasic2(self):\r
69 # connects, include port in host name\r
70 smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))\r
71 smtp.close()\r
72\r
73 def testLocalHostName(self):\r
74 # check that supplied local_hostname is used\r
75 smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")\r
76 self.assertEqual(smtp.local_hostname, "testhost")\r
77 smtp.close()\r
78\r
79 def testTimeoutDefault(self):\r
80 self.assertTrue(socket.getdefaulttimeout() is None)\r
81 socket.setdefaulttimeout(30)\r
82 try:\r
83 smtp = smtplib.SMTP(HOST, self.port)\r
84 finally:\r
85 socket.setdefaulttimeout(None)\r
86 self.assertEqual(smtp.sock.gettimeout(), 30)\r
87 smtp.close()\r
88\r
89 def testTimeoutNone(self):\r
90 self.assertTrue(socket.getdefaulttimeout() is None)\r
91 socket.setdefaulttimeout(30)\r
92 try:\r
93 smtp = smtplib.SMTP(HOST, self.port, timeout=None)\r
94 finally:\r
95 socket.setdefaulttimeout(None)\r
96 self.assertTrue(smtp.sock.gettimeout() is None)\r
97 smtp.close()\r
98\r
99 def testTimeoutValue(self):\r
100 smtp = smtplib.SMTP(HOST, self.port, timeout=30)\r
101 self.assertEqual(smtp.sock.gettimeout(), 30)\r
102 smtp.close()\r
103\r
104\r
105# Test server thread using the specified SMTP server class\r
106def debugging_server(serv, serv_evt, client_evt):\r
107 serv_evt.set()\r
108\r
109 try:\r
110 if hasattr(select, 'poll'):\r
111 poll_fun = asyncore.poll2\r
112 else:\r
113 poll_fun = asyncore.poll\r
114\r
115 n = 1000\r
116 while asyncore.socket_map and n > 0:\r
117 poll_fun(0.01, asyncore.socket_map)\r
118\r
119 # when the client conversation is finished, it will\r
120 # set client_evt, and it's then ok to kill the server\r
121 if client_evt.is_set():\r
122 serv.close()\r
123 break\r
124\r
125 n -= 1\r
126\r
127 except socket.timeout:\r
128 pass\r
129 finally:\r
130 if not client_evt.is_set():\r
131 # allow some time for the client to read the result\r
132 time.sleep(0.5)\r
133 serv.close()\r
134 asyncore.close_all()\r
135 serv_evt.set()\r
136\r
137MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'\r
138MSG_END = '------------ END MESSAGE ------------\n'\r
139\r
140# NOTE: Some SMTP objects in the tests below are created with a non-default\r
141# local_hostname argument to the constructor, since (on some systems) the FQDN\r
142# lookup caused by the default local_hostname sometimes takes so long that the\r
143# test server times out, causing the test to fail.\r
144\r
145# Test behavior of smtpd.DebuggingServer\r
146@unittest.skipUnless(threading, 'Threading required for this test.')\r
147class DebuggingServerTests(unittest.TestCase):\r
148\r
149 def setUp(self):\r
150 # temporarily replace sys.stdout to capture DebuggingServer output\r
151 self.old_stdout = sys.stdout\r
152 self.output = StringIO.StringIO()\r
153 sys.stdout = self.output\r
154\r
155 self._threads = test_support.threading_setup()\r
156 self.serv_evt = threading.Event()\r
157 self.client_evt = threading.Event()\r
158 # Pick a random unused port by passing 0 for the port number\r
159 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))\r
160 # Keep a note of what port was assigned\r
161 self.port = self.serv.socket.getsockname()[1]\r
162 serv_args = (self.serv, self.serv_evt, self.client_evt)\r
163 self.thread = threading.Thread(target=debugging_server, args=serv_args)\r
164 self.thread.start()\r
165\r
166 # wait until server thread has assigned a port number\r
167 self.serv_evt.wait()\r
168 self.serv_evt.clear()\r
169\r
170 def tearDown(self):\r
171 # indicate that the client is finished\r
172 self.client_evt.set()\r
173 # wait for the server thread to terminate\r
174 self.serv_evt.wait()\r
175 self.thread.join()\r
176 test_support.threading_cleanup(*self._threads)\r
177 # restore sys.stdout\r
178 sys.stdout = self.old_stdout\r
179\r
180 def testBasic(self):\r
181 # connect\r
182 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
183 smtp.quit()\r
184\r
185 def testNOOP(self):\r
186 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
187 expected = (250, 'Ok')\r
188 self.assertEqual(smtp.noop(), expected)\r
189 smtp.quit()\r
190\r
191 def testRSET(self):\r
192 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
193 expected = (250, 'Ok')\r
194 self.assertEqual(smtp.rset(), expected)\r
195 smtp.quit()\r
196\r
197 def testNotImplemented(self):\r
198 # EHLO isn't implemented in DebuggingServer\r
199 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
200 expected = (502, 'Error: command "EHLO" not implemented')\r
201 self.assertEqual(smtp.ehlo(), expected)\r
202 smtp.quit()\r
203\r
204 def testVRFY(self):\r
205 # VRFY isn't implemented in DebuggingServer\r
206 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
207 expected = (502, 'Error: command "VRFY" not implemented')\r
208 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)\r
209 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)\r
210 smtp.quit()\r
211\r
212 def testSecondHELO(self):\r
213 # check that a second HELO returns a message that it's a duplicate\r
214 # (this behavior is specific to smtpd.SMTPChannel)\r
215 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
216 smtp.helo()\r
217 expected = (503, 'Duplicate HELO/EHLO')\r
218 self.assertEqual(smtp.helo(), expected)\r
219 smtp.quit()\r
220\r
221 def testHELP(self):\r
222 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
223 self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')\r
224 smtp.quit()\r
225\r
226 def testSend(self):\r
227 # connect and send mail\r
228 m = 'A test message'\r
229 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
230 smtp.sendmail('John', 'Sally', m)\r
231 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor\r
232 # in asyncore. This sleep might help, but should really be fixed\r
233 # properly by using an Event variable.\r
234 time.sleep(0.01)\r
235 smtp.quit()\r
236\r
237 self.client_evt.set()\r
238 self.serv_evt.wait()\r
239 self.output.flush()\r
240 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)\r
241 self.assertEqual(self.output.getvalue(), mexpect)\r
242\r
243\r
244class NonConnectingTests(unittest.TestCase):\r
245\r
246 def testNotConnected(self):\r
247 # Test various operations on an unconnected SMTP object that\r
248 # should raise exceptions (at present the attempt in SMTP.send\r
249 # to reference the nonexistent 'sock' attribute of the SMTP object\r
250 # causes an AttributeError)\r
251 smtp = smtplib.SMTP()\r
252 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)\r
253 self.assertRaises(smtplib.SMTPServerDisconnected,\r
254 smtp.send, 'test msg')\r
255\r
256 def testNonnumericPort(self):\r
257 # check that non-numeric port raises socket.error\r
258 self.assertRaises(socket.error, smtplib.SMTP,\r
259 "localhost", "bogus")\r
260 self.assertRaises(socket.error, smtplib.SMTP,\r
261 "localhost:bogus")\r
262\r
263\r
264# test response of client to a non-successful HELO message\r
265@unittest.skipUnless(threading, 'Threading required for this test.')\r
266class BadHELOServerTests(unittest.TestCase):\r
267\r
268 def setUp(self):\r
269 self.old_stdout = sys.stdout\r
270 self.output = StringIO.StringIO()\r
271 sys.stdout = self.output\r
272\r
273 self._threads = test_support.threading_setup()\r
274 self.evt = threading.Event()\r
275 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
276 self.sock.settimeout(15)\r
277 self.port = test_support.bind_port(self.sock)\r
278 servargs = (self.evt, "199 no hello for you!\n", self.sock)\r
279 self.thread = threading.Thread(target=server, args=servargs)\r
280 self.thread.start()\r
281 self.evt.wait()\r
282 self.evt.clear()\r
283\r
284 def tearDown(self):\r
285 self.evt.wait()\r
286 self.thread.join()\r
287 test_support.threading_cleanup(*self._threads)\r
288 sys.stdout = self.old_stdout\r
289\r
290 def testFailingHELO(self):\r
291 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,\r
292 HOST, self.port, 'localhost', 3)\r
293\r
294\r
295sim_users = {'Mr.A@somewhere.com':'John A',\r
296 'Ms.B@somewhere.com':'Sally B',\r
297 'Mrs.C@somewhereesle.com':'Ruth C',\r
298 }\r
299\r
300sim_auth = ('Mr.A@somewhere.com', 'somepassword')\r
301sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'\r
302 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')\r
303sim_auth_credentials = {\r
304 'login': 'TXIuQUBzb21ld2hlcmUuY29t',\r
305 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',\r
306 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'\r
307 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),\r
308 }\r
309sim_auth_login_password = 'C29TZXBHC3N3B3JK'\r
310\r
311sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],\r
312 'list-2':['Ms.B@somewhere.com',],\r
313 }\r
314\r
315# Simulated SMTP channel & server\r
316class SimSMTPChannel(smtpd.SMTPChannel):\r
317\r
318 def __init__(self, extra_features, *args, **kw):\r
319 self._extrafeatures = ''.join(\r
320 [ "250-{0}\r\n".format(x) for x in extra_features ])\r
321 smtpd.SMTPChannel.__init__(self, *args, **kw)\r
322\r
323 def smtp_EHLO(self, arg):\r
324 resp = ('250-testhost\r\n'\r
325 '250-EXPN\r\n'\r
326 '250-SIZE 20000000\r\n'\r
327 '250-STARTTLS\r\n'\r
328 '250-DELIVERBY\r\n')\r
329 resp = resp + self._extrafeatures + '250 HELP'\r
330 self.push(resp)\r
331\r
332 def smtp_VRFY(self, arg):\r
333 raw_addr = email.utils.parseaddr(arg)[1]\r
334 quoted_addr = smtplib.quoteaddr(arg)\r
335 if raw_addr in sim_users:\r
336 self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))\r
337 else:\r
338 self.push('550 No such user: %s' % arg)\r
339\r
340 def smtp_EXPN(self, arg):\r
341 list_name = email.utils.parseaddr(arg)[1].lower()\r
342 if list_name in sim_lists:\r
343 user_list = sim_lists[list_name]\r
344 for n, user_email in enumerate(user_list):\r
345 quoted_addr = smtplib.quoteaddr(user_email)\r
346 if n < len(user_list) - 1:\r
347 self.push('250-%s %s' % (sim_users[user_email], quoted_addr))\r
348 else:\r
349 self.push('250 %s %s' % (sim_users[user_email], quoted_addr))\r
350 else:\r
351 self.push('550 No access for you!')\r
352\r
353 def smtp_AUTH(self, arg):\r
354 if arg.strip().lower()=='cram-md5':\r
355 self.push('334 {0}'.format(sim_cram_md5_challenge))\r
356 return\r
357 mech, auth = arg.split()\r
358 mech = mech.lower()\r
359 if mech not in sim_auth_credentials:\r
360 self.push('504 auth type unimplemented')\r
361 return\r
362 if mech == 'plain' and auth==sim_auth_credentials['plain']:\r
363 self.push('235 plain auth ok')\r
364 elif mech=='login' and auth==sim_auth_credentials['login']:\r
365 self.push('334 Password:')\r
366 else:\r
367 self.push('550 No access for you!')\r
368\r
369 def handle_error(self):\r
370 raise\r
371\r
372\r
373class SimSMTPServer(smtpd.SMTPServer):\r
374\r
375 def __init__(self, *args, **kw):\r
376 self._extra_features = []\r
377 smtpd.SMTPServer.__init__(self, *args, **kw)\r
378\r
379 def handle_accept(self):\r
380 conn, addr = self.accept()\r
381 self._SMTPchannel = SimSMTPChannel(self._extra_features,\r
382 self, conn, addr)\r
383\r
384 def process_message(self, peer, mailfrom, rcpttos, data):\r
385 pass\r
386\r
387 def add_feature(self, feature):\r
388 self._extra_features.append(feature)\r
389\r
390 def handle_error(self):\r
391 raise\r
392\r
393\r
394# Test various SMTP & ESMTP commands/behaviors that require a simulated server\r
395# (i.e., something with more features than DebuggingServer)\r
396@unittest.skipUnless(threading, 'Threading required for this test.')\r
397class SMTPSimTests(unittest.TestCase):\r
398\r
399 def setUp(self):\r
400 self._threads = test_support.threading_setup()\r
401 self.serv_evt = threading.Event()\r
402 self.client_evt = threading.Event()\r
403 # Pick a random unused port by passing 0 for the port number\r
404 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))\r
405 # Keep a note of what port was assigned\r
406 self.port = self.serv.socket.getsockname()[1]\r
407 serv_args = (self.serv, self.serv_evt, self.client_evt)\r
408 self.thread = threading.Thread(target=debugging_server, args=serv_args)\r
409 self.thread.start()\r
410\r
411 # wait until server thread has assigned a port number\r
412 self.serv_evt.wait()\r
413 self.serv_evt.clear()\r
414\r
415 def tearDown(self):\r
416 # indicate that the client is finished\r
417 self.client_evt.set()\r
418 # wait for the server thread to terminate\r
419 self.serv_evt.wait()\r
420 self.thread.join()\r
421 test_support.threading_cleanup(*self._threads)\r
422\r
423 def testBasic(self):\r
424 # smoke test\r
425 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
426 smtp.quit()\r
427\r
428 def testEHLO(self):\r
429 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
430\r
431 # no features should be present before the EHLO\r
432 self.assertEqual(smtp.esmtp_features, {})\r
433\r
434 # features expected from the test server\r
435 expected_features = {'expn':'',\r
436 'size': '20000000',\r
437 'starttls': '',\r
438 'deliverby': '',\r
439 'help': '',\r
440 }\r
441\r
442 smtp.ehlo()\r
443 self.assertEqual(smtp.esmtp_features, expected_features)\r
444 for k in expected_features:\r
445 self.assertTrue(smtp.has_extn(k))\r
446 self.assertFalse(smtp.has_extn('unsupported-feature'))\r
447 smtp.quit()\r
448\r
449 def testVRFY(self):\r
450 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
451\r
452 for email, name in sim_users.items():\r
453 expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))\r
454 self.assertEqual(smtp.vrfy(email), expected_known)\r
455\r
456 u = 'nobody@nowhere.com'\r
457 expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u))\r
458 self.assertEqual(smtp.vrfy(u), expected_unknown)\r
459 smtp.quit()\r
460\r
461 def testEXPN(self):\r
462 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
463\r
464 for listname, members in sim_lists.items():\r
465 users = []\r
466 for m in members:\r
467 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))\r
468 expected_known = (250, '\n'.join(users))\r
469 self.assertEqual(smtp.expn(listname), expected_known)\r
470\r
471 u = 'PSU-Members-List'\r
472 expected_unknown = (550, 'No access for you!')\r
473 self.assertEqual(smtp.expn(u), expected_unknown)\r
474 smtp.quit()\r
475\r
476 def testAUTH_PLAIN(self):\r
477 self.serv.add_feature("AUTH PLAIN")\r
478 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
479\r
480 expected_auth_ok = (235, b'plain auth ok')\r
481 self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)\r
482\r
483 # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they\r
484 # require a synchronous read to obtain the credentials...so instead smtpd\r
485 # sees the credential sent by smtplib's login method as an unknown command,\r
486 # which results in smtplib raising an auth error. Fortunately the error\r
487 # message contains the encoded credential, so we can partially check that it\r
488 # was generated correctly (partially, because the 'word' is uppercased in\r
489 # the error message).\r
490\r
491 def testAUTH_LOGIN(self):\r
492 self.serv.add_feature("AUTH LOGIN")\r
493 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
494 try: smtp.login(sim_auth[0], sim_auth[1])\r
495 except smtplib.SMTPAuthenticationError as err:\r
496 if sim_auth_login_password not in str(err):\r
497 raise "expected encoded password not found in error message"\r
498\r
499 def testAUTH_CRAM_MD5(self):\r
500 self.serv.add_feature("AUTH CRAM-MD5")\r
501 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
502\r
503 try: smtp.login(sim_auth[0], sim_auth[1])\r
504 except smtplib.SMTPAuthenticationError as err:\r
505 if sim_auth_credentials['cram-md5'] not in str(err):\r
506 raise "expected encoded credentials not found in error message"\r
507\r
508 #TODO: add tests for correct AUTH method fallback now that the\r
509 #test infrastructure can support it.\r
510\r
511\r
512def test_main(verbose=None):\r
513 test_support.run_unittest(GeneralTests, DebuggingServerTests,\r
514 NonConnectingTests,\r
515 BadHELOServerTests, SMTPSimTests)\r
516\r
517if __name__ == '__main__':\r
518 test_main()\r