]>
Commit | Line | Data |
---|---|---|
4710c53d | 1 | from test import test_support as support\r |
2 | # If we end up with a significant number of tests that don't require\r | |
3 | # threading, this test module should be split. Right now we skip\r | |
4 | # them all if we don't have threading.\r | |
5 | threading = support.import_module('threading')\r | |
6 | \r | |
7 | from contextlib import contextmanager\r | |
8 | import imaplib\r | |
9 | import os.path\r | |
10 | import SocketServer\r | |
11 | import time\r | |
12 | \r | |
13 | from test_support import reap_threads, verbose, transient_internet\r | |
14 | import unittest\r | |
15 | \r | |
16 | try:\r | |
17 | import ssl\r | |
18 | except ImportError:\r | |
19 | ssl = None\r | |
20 | \r | |
21 | CERTFILE = None\r | |
22 | \r | |
23 | \r | |
24 | class TestImaplib(unittest.TestCase):\r | |
25 | \r | |
26 | def test_that_Time2Internaldate_returns_a_result(self):\r | |
27 | # We can check only that it successfully produces a result,\r | |
28 | # not the correctness of the result itself, since the result\r | |
29 | # depends on the timezone the machine is in.\r | |
30 | timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),\r | |
31 | '"18-May-2033 05:33:20 +0200"']\r | |
32 | \r | |
33 | for t in timevalues:\r | |
34 | imaplib.Time2Internaldate(t)\r | |
35 | \r | |
36 | \r | |
37 | if ssl:\r | |
38 | \r | |
39 | class SecureTCPServer(SocketServer.TCPServer):\r | |
40 | \r | |
41 | def get_request(self):\r | |
42 | newsocket, fromaddr = self.socket.accept()\r | |
43 | connstream = ssl.wrap_socket(newsocket,\r | |
44 | server_side=True,\r | |
45 | certfile=CERTFILE)\r | |
46 | return connstream, fromaddr\r | |
47 | \r | |
48 | IMAP4_SSL = imaplib.IMAP4_SSL\r | |
49 | \r | |
50 | else:\r | |
51 | \r | |
52 | class SecureTCPServer:\r | |
53 | pass\r | |
54 | \r | |
55 | IMAP4_SSL = None\r | |
56 | \r | |
57 | \r | |
58 | class SimpleIMAPHandler(SocketServer.StreamRequestHandler):\r | |
59 | \r | |
60 | timeout = 1\r | |
61 | \r | |
62 | def _send(self, message):\r | |
63 | if verbose: print "SENT:", message.strip()\r | |
64 | self.wfile.write(message)\r | |
65 | \r | |
66 | def handle(self):\r | |
67 | # Send a welcome message.\r | |
68 | self._send('* OK IMAP4rev1\r\n')\r | |
69 | while 1:\r | |
70 | # Gather up input until we receive a line terminator or we timeout.\r | |
71 | # Accumulate read(1) because it's simpler to handle the differences\r | |
72 | # between naked sockets and SSL sockets.\r | |
73 | line = ''\r | |
74 | while 1:\r | |
75 | try:\r | |
76 | part = self.rfile.read(1)\r | |
77 | if part == '':\r | |
78 | # Naked sockets return empty strings..\r | |
79 | return\r | |
80 | line += part\r | |
81 | except IOError:\r | |
82 | # ..but SSLSockets throw exceptions.\r | |
83 | return\r | |
84 | if line.endswith('\r\n'):\r | |
85 | break\r | |
86 | \r | |
87 | if verbose: print 'GOT:', line.strip()\r | |
88 | splitline = line.split()\r | |
89 | tag = splitline[0]\r | |
90 | cmd = splitline[1]\r | |
91 | args = splitline[2:]\r | |
92 | \r | |
93 | if hasattr(self, 'cmd_%s' % (cmd,)):\r | |
94 | getattr(self, 'cmd_%s' % (cmd,))(tag, args)\r | |
95 | else:\r | |
96 | self._send('%s BAD %s unknown\r\n' % (tag, cmd))\r | |
97 | \r | |
98 | def cmd_CAPABILITY(self, tag, args):\r | |
99 | self._send('* CAPABILITY IMAP4rev1\r\n')\r | |
100 | self._send('%s OK CAPABILITY completed\r\n' % (tag,))\r | |
101 | \r | |
102 | \r | |
103 | class BaseThreadedNetworkedTests(unittest.TestCase):\r | |
104 | \r | |
105 | def make_server(self, addr, hdlr):\r | |
106 | \r | |
107 | class MyServer(self.server_class):\r | |
108 | def handle_error(self, request, client_address):\r | |
109 | self.close_request(request)\r | |
110 | self.server_close()\r | |
111 | raise\r | |
112 | \r | |
113 | if verbose: print "creating server"\r | |
114 | server = MyServer(addr, hdlr)\r | |
115 | self.assertEqual(server.server_address, server.socket.getsockname())\r | |
116 | \r | |
117 | if verbose:\r | |
118 | print "server created"\r | |
119 | print "ADDR =", addr\r | |
120 | print "CLASS =", self.server_class\r | |
121 | print "HDLR =", server.RequestHandlerClass\r | |
122 | \r | |
123 | t = threading.Thread(\r | |
124 | name='%s serving' % self.server_class,\r | |
125 | target=server.serve_forever,\r | |
126 | # Short poll interval to make the test finish quickly.\r | |
127 | # Time between requests is short enough that we won't wake\r | |
128 | # up spuriously too many times.\r | |
129 | kwargs={'poll_interval':0.01})\r | |
130 | t.daemon = True # In case this function raises.\r | |
131 | t.start()\r | |
132 | if verbose: print "server running"\r | |
133 | return server, t\r | |
134 | \r | |
135 | def reap_server(self, server, thread):\r | |
136 | if verbose: print "waiting for server"\r | |
137 | server.shutdown()\r | |
138 | thread.join()\r | |
139 | if verbose: print "done"\r | |
140 | \r | |
141 | @contextmanager\r | |
142 | def reaped_server(self, hdlr):\r | |
143 | server, thread = self.make_server((support.HOST, 0), hdlr)\r | |
144 | try:\r | |
145 | yield server\r | |
146 | finally:\r | |
147 | self.reap_server(server, thread)\r | |
148 | \r | |
149 | @reap_threads\r | |
150 | def test_connect(self):\r | |
151 | with self.reaped_server(SimpleIMAPHandler) as server:\r | |
152 | client = self.imap_class(*server.server_address)\r | |
153 | client.shutdown()\r | |
154 | \r | |
155 | @reap_threads\r | |
156 | def test_issue5949(self):\r | |
157 | \r | |
158 | class EOFHandler(SocketServer.StreamRequestHandler):\r | |
159 | def handle(self):\r | |
160 | # EOF without sending a complete welcome message.\r | |
161 | self.wfile.write('* OK')\r | |
162 | \r | |
163 | with self.reaped_server(EOFHandler) as server:\r | |
164 | self.assertRaises(imaplib.IMAP4.abort,\r | |
165 | self.imap_class, *server.server_address)\r | |
166 | \r | |
167 | \r | |
168 | class ThreadedNetworkedTests(BaseThreadedNetworkedTests):\r | |
169 | \r | |
170 | server_class = SocketServer.TCPServer\r | |
171 | imap_class = imaplib.IMAP4\r | |
172 | \r | |
173 | \r | |
174 | @unittest.skipUnless(ssl, "SSL not available")\r | |
175 | class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):\r | |
176 | \r | |
177 | server_class = SecureTCPServer\r | |
178 | imap_class = IMAP4_SSL\r | |
179 | \r | |
180 | \r | |
181 | class RemoteIMAPTest(unittest.TestCase):\r | |
182 | host = 'cyrus.andrew.cmu.edu'\r | |
183 | port = 143\r | |
184 | username = 'anonymous'\r | |
185 | password = 'pass'\r | |
186 | imap_class = imaplib.IMAP4\r | |
187 | \r | |
188 | def setUp(self):\r | |
189 | with transient_internet(self.host):\r | |
190 | self.server = self.imap_class(self.host, self.port)\r | |
191 | \r | |
192 | def tearDown(self):\r | |
193 | if self.server is not None:\r | |
194 | self.server.logout()\r | |
195 | \r | |
196 | def test_logincapa(self):\r | |
197 | self.assertTrue('LOGINDISABLED' in self.server.capabilities)\r | |
198 | \r | |
199 | def test_anonlogin(self):\r | |
200 | self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)\r | |
201 | rs = self.server.login(self.username, self.password)\r | |
202 | self.assertEqual(rs[0], 'OK')\r | |
203 | \r | |
204 | def test_logout(self):\r | |
205 | rs = self.server.logout()\r | |
206 | self.server = None\r | |
207 | self.assertEqual(rs[0], 'BYE')\r | |
208 | \r | |
209 | \r | |
210 | @unittest.skipUnless(ssl, "SSL not available")\r | |
211 | class RemoteIMAP_SSLTest(RemoteIMAPTest):\r | |
212 | port = 993\r | |
213 | imap_class = IMAP4_SSL\r | |
214 | \r | |
215 | def test_logincapa(self):\r | |
216 | self.assertFalse('LOGINDISABLED' in self.server.capabilities)\r | |
217 | self.assertTrue('AUTH=PLAIN' in self.server.capabilities)\r | |
218 | \r | |
219 | \r | |
220 | def test_main():\r | |
221 | tests = [TestImaplib]\r | |
222 | \r | |
223 | if support.is_resource_enabled('network'):\r | |
224 | if ssl:\r | |
225 | global CERTFILE\r | |
226 | CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,\r | |
227 | "keycert.pem")\r | |
228 | if not os.path.exists(CERTFILE):\r | |
229 | raise support.TestFailed("Can't read certificate files!")\r | |
230 | tests.extend([\r | |
231 | ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,\r | |
232 | RemoteIMAPTest, RemoteIMAP_SSLTest,\r | |
233 | ])\r | |
234 | \r | |
235 | support.run_unittest(*tests)\r | |
236 | \r | |
237 | \r | |
238 | if __name__ == "__main__":\r | |
239 | support.use_resources = ['network']\r | |
240 | test_main()\r |