+++ /dev/null
-import asyncore\r
-import email.utils\r
-import socket\r
-import smtpd\r
-import smtplib\r
-import StringIO\r
-import sys\r
-import time\r
-import select\r
-\r
-import unittest\r
-from test import test_support\r
-\r
-try:\r
- import threading\r
-except ImportError:\r
- threading = None\r
-\r
-HOST = test_support.HOST\r
-\r
-def server(evt, buf, serv):\r
- serv.listen(5)\r
- evt.set()\r
- try:\r
- conn, addr = serv.accept()\r
- except socket.timeout:\r
- pass\r
- else:\r
- n = 500\r
- while buf and n > 0:\r
- r, w, e = select.select([], [conn], [])\r
- if w:\r
- sent = conn.send(buf)\r
- buf = buf[sent:]\r
-\r
- n -= 1\r
-\r
- conn.close()\r
- finally:\r
- serv.close()\r
- evt.set()\r
-\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class GeneralTests(unittest.TestCase):\r
-\r
- def setUp(self):\r
- self._threads = test_support.threading_setup()\r
- self.evt = threading.Event()\r
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
- self.sock.settimeout(15)\r
- self.port = test_support.bind_port(self.sock)\r
- servargs = (self.evt, "220 Hola mundo\n", self.sock)\r
- self.thread = threading.Thread(target=server, args=servargs)\r
- self.thread.start()\r
- self.evt.wait()\r
- self.evt.clear()\r
-\r
- def tearDown(self):\r
- self.evt.wait()\r
- self.thread.join()\r
- test_support.threading_cleanup(*self._threads)\r
-\r
- def testBasic1(self):\r
- # connects\r
- smtp = smtplib.SMTP(HOST, self.port)\r
- smtp.close()\r
-\r
- def testBasic2(self):\r
- # connects, include port in host name\r
- smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))\r
- smtp.close()\r
-\r
- def testLocalHostName(self):\r
- # check that supplied local_hostname is used\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")\r
- self.assertEqual(smtp.local_hostname, "testhost")\r
- smtp.close()\r
-\r
- def testTimeoutDefault(self):\r
- self.assertTrue(socket.getdefaulttimeout() is None)\r
- socket.setdefaulttimeout(30)\r
- try:\r
- smtp = smtplib.SMTP(HOST, self.port)\r
- finally:\r
- socket.setdefaulttimeout(None)\r
- self.assertEqual(smtp.sock.gettimeout(), 30)\r
- smtp.close()\r
-\r
- def testTimeoutNone(self):\r
- self.assertTrue(socket.getdefaulttimeout() is None)\r
- socket.setdefaulttimeout(30)\r
- try:\r
- smtp = smtplib.SMTP(HOST, self.port, timeout=None)\r
- finally:\r
- socket.setdefaulttimeout(None)\r
- self.assertTrue(smtp.sock.gettimeout() is None)\r
- smtp.close()\r
-\r
- def testTimeoutValue(self):\r
- smtp = smtplib.SMTP(HOST, self.port, timeout=30)\r
- self.assertEqual(smtp.sock.gettimeout(), 30)\r
- smtp.close()\r
-\r
-\r
-# Test server thread using the specified SMTP server class\r
-def debugging_server(serv, serv_evt, client_evt):\r
- serv_evt.set()\r
-\r
- try:\r
- if hasattr(select, 'poll'):\r
- poll_fun = asyncore.poll2\r
- else:\r
- poll_fun = asyncore.poll\r
-\r
- n = 1000\r
- while asyncore.socket_map and n > 0:\r
- poll_fun(0.01, asyncore.socket_map)\r
-\r
- # when the client conversation is finished, it will\r
- # set client_evt, and it's then ok to kill the server\r
- if client_evt.is_set():\r
- serv.close()\r
- break\r
-\r
- n -= 1\r
-\r
- except socket.timeout:\r
- pass\r
- finally:\r
- if not client_evt.is_set():\r
- # allow some time for the client to read the result\r
- time.sleep(0.5)\r
- serv.close()\r
- asyncore.close_all()\r
- serv_evt.set()\r
-\r
-MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'\r
-MSG_END = '------------ END MESSAGE ------------\n'\r
-\r
-# NOTE: Some SMTP objects in the tests below are created with a non-default\r
-# local_hostname argument to the constructor, since (on some systems) the FQDN\r
-# lookup caused by the default local_hostname sometimes takes so long that the\r
-# test server times out, causing the test to fail.\r
-\r
-# Test behavior of smtpd.DebuggingServer\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class DebuggingServerTests(unittest.TestCase):\r
-\r
- def setUp(self):\r
- # temporarily replace sys.stdout to capture DebuggingServer output\r
- self.old_stdout = sys.stdout\r
- self.output = StringIO.StringIO()\r
- sys.stdout = self.output\r
-\r
- self._threads = test_support.threading_setup()\r
- self.serv_evt = threading.Event()\r
- self.client_evt = threading.Event()\r
- # Pick a random unused port by passing 0 for the port number\r
- self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))\r
- # Keep a note of what port was assigned\r
- self.port = self.serv.socket.getsockname()[1]\r
- serv_args = (self.serv, self.serv_evt, self.client_evt)\r
- self.thread = threading.Thread(target=debugging_server, args=serv_args)\r
- self.thread.start()\r
-\r
- # wait until server thread has assigned a port number\r
- self.serv_evt.wait()\r
- self.serv_evt.clear()\r
-\r
- def tearDown(self):\r
- # indicate that the client is finished\r
- self.client_evt.set()\r
- # wait for the server thread to terminate\r
- self.serv_evt.wait()\r
- self.thread.join()\r
- test_support.threading_cleanup(*self._threads)\r
- # restore sys.stdout\r
- sys.stdout = self.old_stdout\r
-\r
- def testBasic(self):\r
- # connect\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- smtp.quit()\r
-\r
- def testNOOP(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- expected = (250, 'Ok')\r
- self.assertEqual(smtp.noop(), expected)\r
- smtp.quit()\r
-\r
- def testRSET(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- expected = (250, 'Ok')\r
- self.assertEqual(smtp.rset(), expected)\r
- smtp.quit()\r
-\r
- def testNotImplemented(self):\r
- # EHLO isn't implemented in DebuggingServer\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- expected = (502, 'Error: command "EHLO" not implemented')\r
- self.assertEqual(smtp.ehlo(), expected)\r
- smtp.quit()\r
-\r
- def testVRFY(self):\r
- # VRFY isn't implemented in DebuggingServer\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- expected = (502, 'Error: command "VRFY" not implemented')\r
- self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)\r
- self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)\r
- smtp.quit()\r
-\r
- def testSecondHELO(self):\r
- # check that a second HELO returns a message that it's a duplicate\r
- # (this behavior is specific to smtpd.SMTPChannel)\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- smtp.helo()\r
- expected = (503, 'Duplicate HELO/EHLO')\r
- self.assertEqual(smtp.helo(), expected)\r
- smtp.quit()\r
-\r
- def testHELP(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')\r
- smtp.quit()\r
-\r
- def testSend(self):\r
- # connect and send mail\r
- m = 'A test message'\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)\r
- smtp.sendmail('John', 'Sally', m)\r
- # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor\r
- # in asyncore. This sleep might help, but should really be fixed\r
- # properly by using an Event variable.\r
- time.sleep(0.01)\r
- smtp.quit()\r
-\r
- self.client_evt.set()\r
- self.serv_evt.wait()\r
- self.output.flush()\r
- mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)\r
- self.assertEqual(self.output.getvalue(), mexpect)\r
-\r
-\r
-class NonConnectingTests(unittest.TestCase):\r
-\r
- def testNotConnected(self):\r
- # Test various operations on an unconnected SMTP object that\r
- # should raise exceptions (at present the attempt in SMTP.send\r
- # to reference the nonexistent 'sock' attribute of the SMTP object\r
- # causes an AttributeError)\r
- smtp = smtplib.SMTP()\r
- self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)\r
- self.assertRaises(smtplib.SMTPServerDisconnected,\r
- smtp.send, 'test msg')\r
-\r
- def testNonnumericPort(self):\r
- # check that non-numeric port raises socket.error\r
- self.assertRaises(socket.error, smtplib.SMTP,\r
- "localhost", "bogus")\r
- self.assertRaises(socket.error, smtplib.SMTP,\r
- "localhost:bogus")\r
-\r
-\r
-# test response of client to a non-successful HELO message\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class BadHELOServerTests(unittest.TestCase):\r
-\r
- def setUp(self):\r
- self.old_stdout = sys.stdout\r
- self.output = StringIO.StringIO()\r
- sys.stdout = self.output\r
-\r
- self._threads = test_support.threading_setup()\r
- self.evt = threading.Event()\r
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
- self.sock.settimeout(15)\r
- self.port = test_support.bind_port(self.sock)\r
- servargs = (self.evt, "199 no hello for you!\n", self.sock)\r
- self.thread = threading.Thread(target=server, args=servargs)\r
- self.thread.start()\r
- self.evt.wait()\r
- self.evt.clear()\r
-\r
- def tearDown(self):\r
- self.evt.wait()\r
- self.thread.join()\r
- test_support.threading_cleanup(*self._threads)\r
- sys.stdout = self.old_stdout\r
-\r
- def testFailingHELO(self):\r
- self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,\r
- HOST, self.port, 'localhost', 3)\r
-\r
-\r
-sim_users = {'Mr.A@somewhere.com':'John A',\r
- 'Ms.B@somewhere.com':'Sally B',\r
- 'Mrs.C@somewhereesle.com':'Ruth C',\r
- }\r
-\r
-sim_auth = ('Mr.A@somewhere.com', 'somepassword')\r
-sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'\r
- 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')\r
-sim_auth_credentials = {\r
- 'login': 'TXIuQUBzb21ld2hlcmUuY29t',\r
- 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',\r
- 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'\r
- 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),\r
- }\r
-sim_auth_login_password = 'C29TZXBHC3N3B3JK'\r
-\r
-sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],\r
- 'list-2':['Ms.B@somewhere.com',],\r
- }\r
-\r
-# Simulated SMTP channel & server\r
-class SimSMTPChannel(smtpd.SMTPChannel):\r
-\r
- def __init__(self, extra_features, *args, **kw):\r
- self._extrafeatures = ''.join(\r
- [ "250-{0}\r\n".format(x) for x in extra_features ])\r
- smtpd.SMTPChannel.__init__(self, *args, **kw)\r
-\r
- def smtp_EHLO(self, arg):\r
- resp = ('250-testhost\r\n'\r
- '250-EXPN\r\n'\r
- '250-SIZE 20000000\r\n'\r
- '250-STARTTLS\r\n'\r
- '250-DELIVERBY\r\n')\r
- resp = resp + self._extrafeatures + '250 HELP'\r
- self.push(resp)\r
-\r
- def smtp_VRFY(self, arg):\r
- raw_addr = email.utils.parseaddr(arg)[1]\r
- quoted_addr = smtplib.quoteaddr(arg)\r
- if raw_addr in sim_users:\r
- self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))\r
- else:\r
- self.push('550 No such user: %s' % arg)\r
-\r
- def smtp_EXPN(self, arg):\r
- list_name = email.utils.parseaddr(arg)[1].lower()\r
- if list_name in sim_lists:\r
- user_list = sim_lists[list_name]\r
- for n, user_email in enumerate(user_list):\r
- quoted_addr = smtplib.quoteaddr(user_email)\r
- if n < len(user_list) - 1:\r
- self.push('250-%s %s' % (sim_users[user_email], quoted_addr))\r
- else:\r
- self.push('250 %s %s' % (sim_users[user_email], quoted_addr))\r
- else:\r
- self.push('550 No access for you!')\r
-\r
- def smtp_AUTH(self, arg):\r
- if arg.strip().lower()=='cram-md5':\r
- self.push('334 {0}'.format(sim_cram_md5_challenge))\r
- return\r
- mech, auth = arg.split()\r
- mech = mech.lower()\r
- if mech not in sim_auth_credentials:\r
- self.push('504 auth type unimplemented')\r
- return\r
- if mech == 'plain' and auth==sim_auth_credentials['plain']:\r
- self.push('235 plain auth ok')\r
- elif mech=='login' and auth==sim_auth_credentials['login']:\r
- self.push('334 Password:')\r
- else:\r
- self.push('550 No access for you!')\r
-\r
- def handle_error(self):\r
- raise\r
-\r
-\r
-class SimSMTPServer(smtpd.SMTPServer):\r
-\r
- def __init__(self, *args, **kw):\r
- self._extra_features = []\r
- smtpd.SMTPServer.__init__(self, *args, **kw)\r
-\r
- def handle_accept(self):\r
- conn, addr = self.accept()\r
- self._SMTPchannel = SimSMTPChannel(self._extra_features,\r
- self, conn, addr)\r
-\r
- def process_message(self, peer, mailfrom, rcpttos, data):\r
- pass\r
-\r
- def add_feature(self, feature):\r
- self._extra_features.append(feature)\r
-\r
- def handle_error(self):\r
- raise\r
-\r
-\r
-# Test various SMTP & ESMTP commands/behaviors that require a simulated server\r
-# (i.e., something with more features than DebuggingServer)\r
-@unittest.skipUnless(threading, 'Threading required for this test.')\r
-class SMTPSimTests(unittest.TestCase):\r
-\r
- def setUp(self):\r
- self._threads = test_support.threading_setup()\r
- self.serv_evt = threading.Event()\r
- self.client_evt = threading.Event()\r
- # Pick a random unused port by passing 0 for the port number\r
- self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))\r
- # Keep a note of what port was assigned\r
- self.port = self.serv.socket.getsockname()[1]\r
- serv_args = (self.serv, self.serv_evt, self.client_evt)\r
- self.thread = threading.Thread(target=debugging_server, args=serv_args)\r
- self.thread.start()\r
-\r
- # wait until server thread has assigned a port number\r
- self.serv_evt.wait()\r
- self.serv_evt.clear()\r
-\r
- def tearDown(self):\r
- # indicate that the client is finished\r
- self.client_evt.set()\r
- # wait for the server thread to terminate\r
- self.serv_evt.wait()\r
- self.thread.join()\r
- test_support.threading_cleanup(*self._threads)\r
-\r
- def testBasic(self):\r
- # smoke test\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
- smtp.quit()\r
-\r
- def testEHLO(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
-\r
- # no features should be present before the EHLO\r
- self.assertEqual(smtp.esmtp_features, {})\r
-\r
- # features expected from the test server\r
- expected_features = {'expn':'',\r
- 'size': '20000000',\r
- 'starttls': '',\r
- 'deliverby': '',\r
- 'help': '',\r
- }\r
-\r
- smtp.ehlo()\r
- self.assertEqual(smtp.esmtp_features, expected_features)\r
- for k in expected_features:\r
- self.assertTrue(smtp.has_extn(k))\r
- self.assertFalse(smtp.has_extn('unsupported-feature'))\r
- smtp.quit()\r
-\r
- def testVRFY(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
-\r
- for email, name in sim_users.items():\r
- expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))\r
- self.assertEqual(smtp.vrfy(email), expected_known)\r
-\r
- u = 'nobody@nowhere.com'\r
- expected_unknown = (550, 'No such user: %s' % smtplib.quoteaddr(u))\r
- self.assertEqual(smtp.vrfy(u), expected_unknown)\r
- smtp.quit()\r
-\r
- def testEXPN(self):\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
-\r
- for listname, members in sim_lists.items():\r
- users = []\r
- for m in members:\r
- users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))\r
- expected_known = (250, '\n'.join(users))\r
- self.assertEqual(smtp.expn(listname), expected_known)\r
-\r
- u = 'PSU-Members-List'\r
- expected_unknown = (550, 'No access for you!')\r
- self.assertEqual(smtp.expn(u), expected_unknown)\r
- smtp.quit()\r
-\r
- def testAUTH_PLAIN(self):\r
- self.serv.add_feature("AUTH PLAIN")\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
-\r
- expected_auth_ok = (235, b'plain auth ok')\r
- self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)\r
-\r
- # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they\r
- # require a synchronous read to obtain the credentials...so instead smtpd\r
- # sees the credential sent by smtplib's login method as an unknown command,\r
- # which results in smtplib raising an auth error. Fortunately the error\r
- # message contains the encoded credential, so we can partially check that it\r
- # was generated correctly (partially, because the 'word' is uppercased in\r
- # the error message).\r
-\r
- def testAUTH_LOGIN(self):\r
- self.serv.add_feature("AUTH LOGIN")\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
- try: smtp.login(sim_auth[0], sim_auth[1])\r
- except smtplib.SMTPAuthenticationError as err:\r
- if sim_auth_login_password not in str(err):\r
- raise "expected encoded password not found in error message"\r
-\r
- def testAUTH_CRAM_MD5(self):\r
- self.serv.add_feature("AUTH CRAM-MD5")\r
- smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)\r
-\r
- try: smtp.login(sim_auth[0], sim_auth[1])\r
- except smtplib.SMTPAuthenticationError as err:\r
- if sim_auth_credentials['cram-md5'] not in str(err):\r
- raise "expected encoded credentials not found in error message"\r
-\r
- #TODO: add tests for correct AUTH method fallback now that the\r
- #test infrastructure can support it.\r
-\r
-\r
-def test_main(verbose=None):\r
- test_support.run_unittest(GeneralTests, DebuggingServerTests,\r
- NonConnectingTests,\r
- BadHELOServerTests, SMTPSimTests)\r
-\r
-if __name__ == '__main__':\r
- test_main()\r