]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | import asyncore\r |
2 | import email.utils\r | |
3 | import socket\r | |
4 | import smtpd\r | |
5 | import smtplib\r | |
6 | import StringIO\r | |
7 | import sys\r | |
8 | import time\r | |
9 | import select\r | |
10 | \r | |
11 | import unittest\r | |
12 | from test import test_support\r | |
13 | \r | |
14 | try:\r | |
15 | import threading\r | |
16 | except ImportError:\r | |
17 | threading = None\r | |
18 | \r | |
19 | HOST = test_support.HOST\r | |
20 | \r | |
21 | def server(evt, buf, serv):\r | |
22 | serv.listen(5)\r | |
23 | evt.set()\r | |
24 | try:\r | |
25 | conn, addr = serv.accept()\r | |
26 | except socket.timeout:\r | |
27 | pass\r | |
28 | else:\r | |
29 | n = 500\r | |
30 | while buf and n > 0:\r | |
31 | r, w, e = select.select([], [conn], [])\r | |
32 | if w:\r | |
33 | sent = conn.send(buf)\r | |
34 | buf = buf[sent:]\r | |
35 | \r | |
36 | n -= 1\r | |
37 | \r | |
38 | conn.close()\r | |
39 | finally:\r | |
40 | serv.close()\r | |
41 | evt.set()\r | |
42 | \r | |
43 | @unittest.skipUnless(threading, 'Threading required for this test.')\r | |
44 | class GeneralTests(unittest.TestCase):\r | |
45 | \r | |
46 | def setUp(self):\r | |
47 | self._threads = test_support.threading_setup()\r | |
48 | self.evt = threading.Event()\r | |
49 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r | |
50 | self.sock.settimeout(15)\r | |
51 | self.port = test_support.bind_port(self.sock)\r | |
52 | servargs = (self.evt, "220 Hola mundo\n", self.sock)\r | |
53 | self.thread = threading.Thread(target=server, args=servargs)\r | |
54 | self.thread.start()\r | |
55 | self.evt.wait()\r | |
56 | self.evt.clear()\r | |
57 | \r | |
58 | def tearDown(self):\r | |
59 | self.evt.wait()\r | |
60 | self.thread.join()\r | |
61 | test_support.threading_cleanup(*self._threads)\r | |
62 | \r | |
63 | def testBasic1(self):\r | |
64 | # connects\r | |
65 | smtp = smtplib.SMTP(HOST, self.port)\r | |
66 | smtp.close()\r | |
67 | \r | |
68 | def testBasic2(self):\r | |
69 | # connects, include port in host name\r | |
70 | smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))\r | |
71 | smtp.close()\r | |
72 | \r | |
73 | def testLocalHostName(self):\r | |
74 | # check that supplied local_hostname is used\r | |
75 | smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")\r | |
76 | self.assertEqual(smtp.local_hostname, "testhost")\r | |
77 | smtp.close()\r | |
78 | \r | |
79 | def testTimeoutDefault(self):\r | |
80 | self.assertTrue(socket.getdefaulttimeout() is None)\r | |
81 | socket.setdefaulttimeout(30)\r | |
82 | try:\r | |
83 | smtp = smtplib.SMTP(HOST, self.port)\r | |
84 | finally:\r | |
85 | socket.setdefaulttimeout(None)\r | |
86 | self.assertEqual(smtp.sock.gettimeout(), 30)\r | |
87 | smtp.close()\r | |
88 | \r | |
89 | def testTimeoutNone(self):\r | |
90 | self.assertTrue(socket.getdefaulttimeout() is None)\r | |
91 | socket.setdefaulttimeout(30)\r | |
92 | try:\r | |
93 | smtp = smtplib.SMTP(HOST, self.port, timeout=None)\r | |
94 | finally:\r | |
95 | socket.setdefaulttimeout(None)\r | |
96 | self.assertTrue(smtp.sock.gettimeout() is None)\r | |
97 | smtp.close()\r | |
98 | \r | |
99 | def testTimeoutValue(self):\r | |
100 | smtp = smtplib.SMTP(HOST, self.port, timeout=30)\r | |
101 | self.assertEqual(smtp.sock.gettimeout(), 30)\r | |
102 | smtp.close()\r | |
103 | \r | |
104 | \r | |
105 | # Test server thread using the specified SMTP server class\r | |
106 | def debugging_server(serv, serv_evt, client_evt):\r | |
107 | serv_evt.set()\r | |
108 | \r | |
109 | try:\r | |
110 | if hasattr(select, 'poll'):\r | |
111 | poll_fun = asyncore.poll2\r | |
112 | else:\r | |
113 | poll_fun = asyncore.poll\r | |
114 | \r | |
115 | n = 1000\r | |
116 | while asyncore.socket_map and n > 0:\r | |
117 | poll_fun(0.01, asyncore.socket_map)\r | |
118 | \r | |
119 | # when the client conversation is finished, it will\r | |
120 | # set client_evt, and it's then ok to kill the server\r | |
121 | if client_evt.is_set():\r | |
122 | serv.close()\r | |
123 | break\r | |
124 | \r | |
125 | n -= 1\r | |
126 | \r | |
127 | except socket.timeout:\r | |
128 | pass\r | |
129 | finally:\r | |
130 | if not client_evt.is_set():\r | |
131 | # allow some time for the client to read the result\r | |
132 | time.sleep(0.5)\r | |
133 | serv.close()\r | |
134 | asyncore.close_all()\r | |
135 | serv_evt.set()\r | |
136 | \r | |
137 | MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'\r | |
138 | MSG_END = '------------ END MESSAGE ------------\n'\r | |
139 | \r | |
140 | # NOTE: Some SMTP objects in the tests below are created with a non-default\r | |
141 | # local_hostname argument to the constructor, since (on some systems) the FQDN\r | |
142 | # lookup caused by the default local_hostname sometimes takes so long that the\r | |
143 | # test server times out, causing the test to fail.\r | |
144 | \r | |
145 | # Test behavior of smtpd.DebuggingServer\r | |
146 | @unittest.skipUnless(threading, 'Threading required for this test.')\r | |
147 | class DebuggingServerTests(unittest.TestCase):\r | |
148 | \r | |
149 | def setUp(self):\r | |
150 | # temporarily replace sys.stdout to capture DebuggingServer output\r | |
151 | self.old_stdout = sys.stdout\r | |
152 | self.output = StringIO.StringIO()\r | |
153 | sys.stdout = self.output\r | |
154 | \r | |
155 | self._threads = test_support.threading_setup()\r | |
156 | self.serv_evt = threading.Event()\r | |
157 | self.client_evt = threading.Event()\r | |
158 | # Pick a random unused port by passing 0 for the port number\r | |
159 | self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))\r | |
160 | # Keep a note of what port was assigned\r | |
161 | self.port = self.serv.socket.getsockname()[1]\r | |
162 | serv_args = (self.serv, self.serv_evt, self.client_evt)\r | |
163 | self.thread = threading.Thread(target=debugging_server, args=serv_args)\r | |
164 | self.thread.start()\r | |
165 | \r | |
166 | # wait until server thread has assigned a port number\r | |
167 | self.serv_evt.wait()\r | |
168 | self.serv_evt.clear()\r | |
169 | \r | |
170 | def tearDown(self):\r | |
171 | # indicate that the client is finished\r | |
172 | self.client_evt.set()\r | |
173 | # wait for the server thread to terminate\r | |
174 | self.serv_evt.wait()\r | |
175 | self.thread.join()\r | |
176 | test_support.threading_cleanup(*self._threads)\r | |
177 | # restore sys.stdout\r | |
178 | sys.stdout = self.old_stdout\r | |
179 | \r | |
180 | def testBasic(self):\r | |
181 | # connect\r | |
182 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
183 | smtp.quit()\r | |
184 | \r | |
185 | def testNOOP(self):\r | |
186 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
187 | expected = (250, 'Ok')\r | |
188 | self.assertEqual(smtp.noop(), expected)\r | |
189 | smtp.quit()\r | |
190 | \r | |
191 | def testRSET(self):\r | |
192 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
193 | expected = (250, 'Ok')\r | |
194 | self.assertEqual(smtp.rset(), expected)\r | |
195 | smtp.quit()\r | |
196 | \r | |
197 | def testNotImplemented(self):\r | |
198 | # EHLO isn't implemented in DebuggingServer\r | |
199 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
200 | expected = (502, 'Error: command "EHLO" not implemented')\r | |
201 | self.assertEqual(smtp.ehlo(), expected)\r | |
202 | smtp.quit()\r | |
203 | \r | |
204 | def testVRFY(self):\r | |
205 | # VRFY isn't implemented in DebuggingServer\r | |
206 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
207 | expected = (502, 'Error: command "VRFY" not implemented')\r | |
208 | self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)\r | |
209 | self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)\r | |
210 | smtp.quit()\r | |
211 | \r | |
212 | def testSecondHELO(self):\r | |
213 | # check that a second HELO returns a message that it's a duplicate\r | |
214 | # (this behavior is specific to smtpd.SMTPChannel)\r | |
215 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
216 | smtp.helo()\r | |
217 | expected = (503, 'Duplicate HELO/EHLO')\r | |
218 | self.assertEqual(smtp.helo(), expected)\r | |
219 | smtp.quit()\r | |
220 | \r | |
221 | def testHELP(self):\r | |
222 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
223 | self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')\r | |
224 | smtp.quit()\r | |
225 | \r | |
226 | def testSend(self):\r | |
227 | # connect and send mail\r | |
228 | m = 'A test message'\r | |
229 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r | |
230 | smtp.sendmail('John', 'Sally', m)\r | |
231 | # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor\r | |
232 | # in asyncore. This sleep might help, but should really be fixed\r | |
233 | # properly by using an Event variable.\r | |
234 | time.sleep(0.01)\r | |
235 | smtp.quit()\r | |
236 | \r | |
237 | self.client_evt.set()\r | |
238 | self.serv_evt.wait()\r | |
239 | self.output.flush()\r | |
240 | mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)\r | |
241 | self.assertEqual(self.output.getvalue(), mexpect)\r | |
242 | \r | |
243 | \r | |
244 | class NonConnectingTests(unittest.TestCase):\r | |
245 | \r | |
246 | def testNotConnected(self):\r | |
247 | # Test various operations on an unconnected SMTP object that\r | |
248 | # should raise exceptions (at present the attempt in SMTP.send\r | |
249 | # to reference the nonexistent 'sock' attribute of the SMTP object\r | |
250 | # causes an AttributeError)\r | |
251 | smtp = smtplib.SMTP()\r | |
252 | self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)\r | |
253 | self.assertRaises(smtplib.SMTPServerDisconnected,\r | |
254 | smtp.send, 'test msg')\r | |
255 | \r | |
256 | def testNonnumericPort(self):\r | |
257 | # check that non-numeric port raises socket.error\r | |
258 | self.assertRaises(socket.error, smtplib.SMTP,\r | |
259 | "localhost", "bogus")\r | |
260 | self.assertRaises(socket.error, smtplib.SMTP,\r | |
261 | "localhost:bogus")\r | |
262 | \r | |
263 | \r | |
264 | # test response of client to a non-successful HELO message\r | |
265 | @unittest.skipUnless(threading, 'Threading required for this test.')\r | |
266 | class BadHELOServerTests(unittest.TestCase):\r | |
267 | \r | |
268 | def setUp(self):\r | |
269 | self.old_stdout = sys.stdout\r | |
270 | self.output = StringIO.StringIO()\r | |
271 | sys.stdout = self.output\r | |
272 | \r | |
273 | self._threads = test_support.threading_setup()\r | |
274 | self.evt = threading.Event()\r | |
275 | self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r | |
276 | self.sock.settimeout(15)\r | |
277 | self.port = test_support.bind_port(self.sock)\r | |
278 | servargs = (self.evt, "199 no hello for you!\n", self.sock)\r | |
279 | self.thread = threading.Thread(target=server, args=servargs)\r | |
280 | self.thread.start()\r | |
281 | self.evt.wait()\r | |
282 | self.evt.clear()\r | |
283 | \r | |
284 | def tearDown(self):\r | |
285 | self.evt.wait()\r | |
286 | self.thread.join()\r | |
287 | test_support.threading_cleanup(*self._threads)\r | |
288 | sys.stdout = self.old_stdout\r | |
289 | \r | |
290 | def testFailingHELO(self):\r | |
291 | self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,\r | |
292 | HOST, self.port, 'localhost', 3)\r | |
293 | \r | |
294 | \r | |
295 | sim_users = {'Mr.A@somewhere.com':'John A',\r | |
296 | 'Ms.B@somewhere.com':'Sally B',\r | |
297 | 'Mrs.C@somewhereesle.com':'Ruth C',\r | |
298 | }\r | |
299 | \r | |
300 | sim_auth = ('Mr.A@somewhere.com', 'somepassword')\r | |
301 | sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'\r | |
302 | 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')\r | |
303 | sim_auth_credentials = {\r | |
304 | 'login': 'TXIuQUBzb21ld2hlcmUuY29t',\r | |
305 | 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',\r | |
306 | 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'\r | |
307 | 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),\r | |
308 | }\r | |
309 | sim_auth_login_password = 'C29TZXBHC3N3B3JK'\r | |
310 | \r | |
311 | sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],\r | |
312 | 'list-2':['Ms.B@somewhere.com',],\r | |
313 | }\r | |
314 | \r | |
315 | # Simulated SMTP channel & server\r | |
316 | class SimSMTPChannel(smtpd.SMTPChannel):\r | |
317 | \r | |
318 | def __init__(self, extra_features, *args, **kw):\r | |
319 | self._extrafeatures = ''.join(\r | |
320 | [ "250-{0}\r\n".format(x) for x in extra_features ])\r | |
321 | smtpd.SMTPChannel.__init__(self, *args, **kw)\r | |
322 | \r | |
323 | def smtp_EHLO(self, arg):\r | |
324 | resp = ('250-testhost\r\n'\r | |
325 | '250-EXPN\r\n'\r | |
326 | '250-SIZE 20000000\r\n'\r | |
327 | '250-STARTTLS\r\n'\r | |
328 | '250-DELIVERBY\r\n')\r | |
329 | resp = resp + self._extrafeatures + '250 HELP'\r | |
330 | self.push(resp)\r | |
331 | \r | |
332 | def smtp_VRFY(self, arg):\r | |
333 | raw_addr = email.utils.parseaddr(arg)[1]\r | |
334 | quoted_addr = smtplib.quoteaddr(arg)\r | |
335 | if raw_addr in sim_users:\r | |
336 | self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))\r | |
337 | else:\r | |
338 | self.push('550 No such user: %s' % arg)\r | |
339 | \r | |
340 | def smtp_EXPN(self, arg):\r | |
341 | list_name = email.utils.parseaddr(arg)[1].lower()\r | |
342 | if list_name in sim_lists:\r | |
343 | user_list = sim_lists[list_name]\r | |
344 | for n, user_email in enumerate(user_list):\r | |
345 | quoted_addr = smtplib.quoteaddr(user_email)\r | |
346 | if n < len(user_list) - 1:\r | |
347 | self.push('250-%s %s' % (sim_users[user_email], quoted_addr))\r | |
348 | else:\r | |
349 | self.push('250 %s %s' % (sim_users[user_email], quoted_addr))\r | |
350 | else:\r | |
351 | self.push('550 No access for you!')\r | |
352 | \r | |
353 | def smtp_AUTH(self, arg):\r | |
354 | if arg.strip().lower()=='cram-md5':\r | |
355 | self.push('334 {0}'.format(sim_cram_md5_challenge))\r | |
356 | return\r | |
357 | mech, auth = arg.split()\r | |
358 | mech = mech.lower()\r | |
359 | if mech not in sim_auth_credentials:\r | |
360 | self.push('504 auth type unimplemented')\r | |
361 | return\r | |
362 | if mech == 'plain' and auth==sim_auth_credentials['plain']:\r | |
363 | self.push('235 plain auth ok')\r | |
364 | elif mech=='login' and auth==sim_auth_credentials['login']:\r | |
365 | self.push('334 Password:')\r | |
366 | else:\r | |
367 | self.push('550 No access for you!')\r | |
368 | \r | |
369 | def handle_error(self):\r | |
370 | raise\r | |
371 | \r | |
372 | \r | |
373 | class SimSMTPServer(smtpd.SMTPServer):\r | |
374 | \r | |
375 | def __init__(self, *args, **kw):\r | |
376 | self._extra_features = []\r | |
377 | smtpd.SMTPServer.__init__(self, *args, **kw)\r | |
378 | \r | |
379 | def handle_accept(self):\r | |
380 | conn, addr = self.accept()\r | |
381 | self._SMTPchannel = SimSMTPChannel(self._extra_features,\r | |
382 | self, conn, addr)\r | |
383 | \r | |
384 | def process_message(self, peer, mailfrom, rcpttos, data):\r | |
385 | pass\r | |
386 | \r | |
387 | def add_feature(self, feature):\r | |
388 | self._extra_features.append(feature)\r | |
389 | \r | |
390 | def handle_error(self):\r | |
391 | raise\r | |
392 | \r | |
393 | \r | |
394 | # Test various SMTP & ESMTP commands/behaviors that require a simulated server\r | |
395 | # (i.e., something with more features than DebuggingServer)\r | |
396 | @unittest.skipUnless(threading, 'Threading required for this test.')\r | |
397 | class SMTPSimTests(unittest.TestCase):\r | |
398 | \r | |
399 | def setUp(self):\r | |
400 | self._threads = test_support.threading_setup()\r | |
401 | self.serv_evt = threading.Event()\r | |
402 | self.client_evt = threading.Event()\r | |
403 | # Pick a random unused port by passing 0 for the port number\r | |
404 | self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))\r | |
405 | # Keep a note of what port was assigned\r | |
406 | self.port = self.serv.socket.getsockname()[1]\r | |
407 | serv_args = (self.serv, self.serv_evt, self.client_evt)\r | |
408 | self.thread = threading.Thread(target=debugging_server, args=serv_args)\r | |
409 | self.thread.start()\r | |
410 | \r | |
411 | # wait until server thread has assigned a port number\r | |
412 | self.serv_evt.wait()\r | |
413 | self.serv_evt.clear()\r | |
414 | \r | |
415 | def tearDown(self):\r | |
416 | # indicate that the client is finished\r | |
417 | self.client_evt.set()\r | |
418 | # wait for the server thread to terminate\r | |
419 | self.serv_evt.wait()\r | |
420 | self.thread.join()\r | |
421 | test_support.threading_cleanup(*self._threads)\r | |
422 | \r | |
423 | def testBasic(self):\r | |
424 | # smoke test\r | |
425 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
426 | smtp.quit()\r | |
427 | \r | |
428 | def testEHLO(self):\r | |
429 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
430 | \r | |
431 | # no features should be present before the EHLO\r | |
432 | self.assertEqual(smtp.esmtp_features, {})\r | |
433 | \r | |
434 | # features expected from the test server\r | |
435 | expected_features = {'expn':'',\r | |
436 | 'size': '20000000',\r | |
437 | 'starttls': '',\r | |
438 | 'deliverby': '',\r | |
439 | 'help': '',\r | |
440 | }\r | |
441 | \r | |
442 | smtp.ehlo()\r | |
443 | self.assertEqual(smtp.esmtp_features, expected_features)\r | |
444 | for k in expected_features:\r | |
445 | self.assertTrue(smtp.has_extn(k))\r | |
446 | self.assertFalse(smtp.has_extn('unsupported-feature'))\r | |
447 | smtp.quit()\r | |
448 | \r | |
449 | def testVRFY(self):\r | |
450 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
451 | \r | |
452 | for email, name in sim_users.items():\r | |
453 | expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))\r | |
454 | self.assertEqual(smtp.vrfy(email), expected_known)\r | |
455 | \r | |
456 | u = 'nobody@nowhere.com'\r | |
457 | expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u))\r | |
458 | self.assertEqual(smtp.vrfy(u), expected_unknown)\r | |
459 | smtp.quit()\r | |
460 | \r | |
461 | def testEXPN(self):\r | |
462 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
463 | \r | |
464 | for listname, members in sim_lists.items():\r | |
465 | users = []\r | |
466 | for m in members:\r | |
467 | users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))\r | |
468 | expected_known = (250, '\n'.join(users))\r | |
469 | self.assertEqual(smtp.expn(listname), expected_known)\r | |
470 | \r | |
471 | u = 'PSU-Members-List'\r | |
472 | expected_unknown = (550, 'No access for you!')\r | |
473 | self.assertEqual(smtp.expn(u), expected_unknown)\r | |
474 | smtp.quit()\r | |
475 | \r | |
476 | def testAUTH_PLAIN(self):\r | |
477 | self.serv.add_feature("AUTH PLAIN")\r | |
478 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
479 | \r | |
480 | expected_auth_ok = (235, b'plain auth ok')\r | |
481 | self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)\r | |
482 | \r | |
483 | # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they\r | |
484 | # require a synchronous read to obtain the credentials...so instead smtpd\r | |
485 | # sees the credential sent by smtplib's login method as an unknown command,\r | |
486 | # which results in smtplib raising an auth error. Fortunately the error\r | |
487 | # message contains the encoded credential, so we can partially check that it\r | |
488 | # was generated correctly (partially, because the 'word' is uppercased in\r | |
489 | # the error message).\r | |
490 | \r | |
491 | def testAUTH_LOGIN(self):\r | |
492 | self.serv.add_feature("AUTH LOGIN")\r | |
493 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
494 | try: smtp.login(sim_auth[0], sim_auth[1])\r | |
495 | except smtplib.SMTPAuthenticationError as err:\r | |
496 | if sim_auth_login_password not in str(err):\r | |
497 | raise "expected encoded password not found in error message"\r | |
498 | \r | |
499 | def testAUTH_CRAM_MD5(self):\r | |
500 | self.serv.add_feature("AUTH CRAM-MD5")\r | |
501 | smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r | |
502 | \r | |
503 | try: smtp.login(sim_auth[0], sim_auth[1])\r | |
504 | except smtplib.SMTPAuthenticationError as err:\r | |
505 | if sim_auth_credentials['cram-md5'] not in str(err):\r | |
506 | raise "expected encoded credentials not found in error message"\r | |
507 | \r | |
508 | #TODO: add tests for correct AUTH method fallback now that the\r | |
509 | #test infrastructure can support it.\r | |
510 | \r | |
511 | \r | |
512 | def test_main(verbose=None):\r | |
513 | test_support.run_unittest(GeneralTests, DebuggingServerTests,\r | |
514 | NonConnectingTests,\r | |
515 | BadHELOServerTests, SMTPSimTests)\r | |
516 | \r | |
517 | if __name__ == '__main__':\r | |
518 | test_main()\r |