]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_imaplib.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_imaplib.py
CommitLineData
4710c53d 1from test import test_support as support\r
2# If we end up with a significant number of tests that don't require\r
3# threading, this test module should be split. Right now we skip\r
4# them all if we don't have threading.\r
5threading = support.import_module('threading')\r
6\r
7from contextlib import contextmanager\r
8import imaplib\r
9import os.path\r
10import SocketServer\r
11import time\r
12\r
13from test_support import reap_threads, verbose, transient_internet\r
14import unittest\r
15\r
16try:\r
17 import ssl\r
18except ImportError:\r
19 ssl = None\r
20\r
21CERTFILE = None\r
22\r
23\r
24class TestImaplib(unittest.TestCase):\r
25\r
26 def test_that_Time2Internaldate_returns_a_result(self):\r
27 # We can check only that it successfully produces a result,\r
28 # not the correctness of the result itself, since the result\r
29 # depends on the timezone the machine is in.\r
30 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),\r
31 '"18-May-2033 05:33:20 +0200"']\r
32\r
33 for t in timevalues:\r
34 imaplib.Time2Internaldate(t)\r
35\r
36\r
37if ssl:\r
38\r
39 class SecureTCPServer(SocketServer.TCPServer):\r
40\r
41 def get_request(self):\r
42 newsocket, fromaddr = self.socket.accept()\r
43 connstream = ssl.wrap_socket(newsocket,\r
44 server_side=True,\r
45 certfile=CERTFILE)\r
46 return connstream, fromaddr\r
47\r
48 IMAP4_SSL = imaplib.IMAP4_SSL\r
49\r
50else:\r
51\r
52 class SecureTCPServer:\r
53 pass\r
54\r
55 IMAP4_SSL = None\r
56\r
57\r
58class SimpleIMAPHandler(SocketServer.StreamRequestHandler):\r
59\r
60 timeout = 1\r
61\r
62 def _send(self, message):\r
63 if verbose: print "SENT:", message.strip()\r
64 self.wfile.write(message)\r
65\r
66 def handle(self):\r
67 # Send a welcome message.\r
68 self._send('* OK IMAP4rev1\r\n')\r
69 while 1:\r
70 # Gather up input until we receive a line terminator or we timeout.\r
71 # Accumulate read(1) because it's simpler to handle the differences\r
72 # between naked sockets and SSL sockets.\r
73 line = ''\r
74 while 1:\r
75 try:\r
76 part = self.rfile.read(1)\r
77 if part == '':\r
78 # Naked sockets return empty strings..\r
79 return\r
80 line += part\r
81 except IOError:\r
82 # ..but SSLSockets throw exceptions.\r
83 return\r
84 if line.endswith('\r\n'):\r
85 break\r
86\r
87 if verbose: print 'GOT:', line.strip()\r
88 splitline = line.split()\r
89 tag = splitline[0]\r
90 cmd = splitline[1]\r
91 args = splitline[2:]\r
92\r
93 if hasattr(self, 'cmd_%s' % (cmd,)):\r
94 getattr(self, 'cmd_%s' % (cmd,))(tag, args)\r
95 else:\r
96 self._send('%s BAD %s unknown\r\n' % (tag, cmd))\r
97\r
98 def cmd_CAPABILITY(self, tag, args):\r
99 self._send('* CAPABILITY IMAP4rev1\r\n')\r
100 self._send('%s OK CAPABILITY completed\r\n' % (tag,))\r
101\r
102\r
103class BaseThreadedNetworkedTests(unittest.TestCase):\r
104\r
105 def make_server(self, addr, hdlr):\r
106\r
107 class MyServer(self.server_class):\r
108 def handle_error(self, request, client_address):\r
109 self.close_request(request)\r
110 self.server_close()\r
111 raise\r
112\r
113 if verbose: print "creating server"\r
114 server = MyServer(addr, hdlr)\r
115 self.assertEqual(server.server_address, server.socket.getsockname())\r
116\r
117 if verbose:\r
118 print "server created"\r
119 print "ADDR =", addr\r
120 print "CLASS =", self.server_class\r
121 print "HDLR =", server.RequestHandlerClass\r
122\r
123 t = threading.Thread(\r
124 name='%s serving' % self.server_class,\r
125 target=server.serve_forever,\r
126 # Short poll interval to make the test finish quickly.\r
127 # Time between requests is short enough that we won't wake\r
128 # up spuriously too many times.\r
129 kwargs={'poll_interval':0.01})\r
130 t.daemon = True # In case this function raises.\r
131 t.start()\r
132 if verbose: print "server running"\r
133 return server, t\r
134\r
135 def reap_server(self, server, thread):\r
136 if verbose: print "waiting for server"\r
137 server.shutdown()\r
138 thread.join()\r
139 if verbose: print "done"\r
140\r
141 @contextmanager\r
142 def reaped_server(self, hdlr):\r
143 server, thread = self.make_server((support.HOST, 0), hdlr)\r
144 try:\r
145 yield server\r
146 finally:\r
147 self.reap_server(server, thread)\r
148\r
149 @reap_threads\r
150 def test_connect(self):\r
151 with self.reaped_server(SimpleIMAPHandler) as server:\r
152 client = self.imap_class(*server.server_address)\r
153 client.shutdown()\r
154\r
155 @reap_threads\r
156 def test_issue5949(self):\r
157\r
158 class EOFHandler(SocketServer.StreamRequestHandler):\r
159 def handle(self):\r
160 # EOF without sending a complete welcome message.\r
161 self.wfile.write('* OK')\r
162\r
163 with self.reaped_server(EOFHandler) as server:\r
164 self.assertRaises(imaplib.IMAP4.abort,\r
165 self.imap_class, *server.server_address)\r
166\r
167\r
168class ThreadedNetworkedTests(BaseThreadedNetworkedTests):\r
169\r
170 server_class = SocketServer.TCPServer\r
171 imap_class = imaplib.IMAP4\r
172\r
173\r
174@unittest.skipUnless(ssl, "SSL not available")\r
175class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):\r
176\r
177 server_class = SecureTCPServer\r
178 imap_class = IMAP4_SSL\r
179\r
180\r
181class RemoteIMAPTest(unittest.TestCase):\r
182 host = 'cyrus.andrew.cmu.edu'\r
183 port = 143\r
184 username = 'anonymous'\r
185 password = 'pass'\r
186 imap_class = imaplib.IMAP4\r
187\r
188 def setUp(self):\r
189 with transient_internet(self.host):\r
190 self.server = self.imap_class(self.host, self.port)\r
191\r
192 def tearDown(self):\r
193 if self.server is not None:\r
194 self.server.logout()\r
195\r
196 def test_logincapa(self):\r
197 self.assertTrue('LOGINDISABLED' in self.server.capabilities)\r
198\r
199 def test_anonlogin(self):\r
200 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)\r
201 rs = self.server.login(self.username, self.password)\r
202 self.assertEqual(rs[0], 'OK')\r
203\r
204 def test_logout(self):\r
205 rs = self.server.logout()\r
206 self.server = None\r
207 self.assertEqual(rs[0], 'BYE')\r
208\r
209\r
210@unittest.skipUnless(ssl, "SSL not available")\r
211class RemoteIMAP_SSLTest(RemoteIMAPTest):\r
212 port = 993\r
213 imap_class = IMAP4_SSL\r
214\r
215 def test_logincapa(self):\r
216 self.assertFalse('LOGINDISABLED' in self.server.capabilities)\r
217 self.assertTrue('AUTH=PLAIN' in self.server.capabilities)\r
218\r
219\r
220def test_main():\r
221 tests = [TestImaplib]\r
222\r
223 if support.is_resource_enabled('network'):\r
224 if ssl:\r
225 global CERTFILE\r
226 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,\r
227 "keycert.pem")\r
228 if not os.path.exists(CERTFILE):\r
229 raise support.TestFailed("Can't read certificate files!")\r
230 tests.extend([\r
231 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,\r
232 RemoteIMAPTest, RemoteIMAP_SSLTest,\r
233 ])\r
234\r
235 support.run_unittest(*tests)\r
236\r
237\r
238if __name__ == "__main__":\r
239 support.use_resources = ['network']\r
240 test_main()\r