]> git.proxmox.com Git - mirror_edk2.git/blame - AppPkg/Applications/Python/Python-2.7.2/Lib/test/test_xmlrpc.py
EmbeddedPkg: Extend NvVarStoreFormattedLib LIBRARY_CLASS
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Lib / test / test_xmlrpc.py
CommitLineData
4710c53d 1import base64\r
2import datetime\r
3import sys\r
4import time\r
5import unittest\r
6import xmlrpclib\r
7import SimpleXMLRPCServer\r
8import mimetools\r
9import httplib\r
10import socket\r
11import StringIO\r
12import os\r
13import re\r
14from test import test_support\r
15\r
16try:\r
17 import threading\r
18except ImportError:\r
19 threading = None\r
20\r
21try:\r
22 unicode\r
23except NameError:\r
24 have_unicode = False\r
25else:\r
26 have_unicode = True\r
27\r
28alist = [{'astring': 'foo@bar.baz.spam',\r
29 'afloat': 7283.43,\r
30 'anint': 2**20,\r
31 'ashortlong': 2L,\r
32 'anotherlist': ['.zyx.41'],\r
33 'abase64': xmlrpclib.Binary("my dog has fleas"),\r
34 'boolean': xmlrpclib.False,\r
35 'unicode': u'\u4000\u6000\u8000',\r
36 u'ukey\u4000': 'regular value',\r
37 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),\r
38 'datetime2': xmlrpclib.DateTime(\r
39 (2005, 02, 10, 11, 41, 23, 0, 1, -1)),\r
40 'datetime3': xmlrpclib.DateTime(\r
41 datetime.datetime(2005, 02, 10, 11, 41, 23)),\r
42 }]\r
43\r
44class XMLRPCTestCase(unittest.TestCase):\r
45\r
46 def test_dump_load(self):\r
47 self.assertEqual(alist,\r
48 xmlrpclib.loads(xmlrpclib.dumps((alist,)))[0][0])\r
49\r
50 def test_dump_bare_datetime(self):\r
51 # This checks that an unwrapped datetime.date object can be handled\r
52 # by the marshalling code. This can't be done via test_dump_load()\r
53 # since with use_datetime set to 1 the unmarshaller would create\r
54 # datetime objects for the 'datetime[123]' keys as well\r
55 dt = datetime.datetime(2005, 02, 10, 11, 41, 23)\r
56 s = xmlrpclib.dumps((dt,))\r
57 (newdt,), m = xmlrpclib.loads(s, use_datetime=1)\r
58 self.assertEqual(newdt, dt)\r
59 self.assertEqual(m, None)\r
60\r
61 (newdt,), m = xmlrpclib.loads(s, use_datetime=0)\r
62 self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))\r
63\r
64 def test_datetime_before_1900(self):\r
65 # same as before but with a date before 1900\r
66 dt = datetime.datetime(1, 02, 10, 11, 41, 23)\r
67 s = xmlrpclib.dumps((dt,))\r
68 (newdt,), m = xmlrpclib.loads(s, use_datetime=1)\r
69 self.assertEqual(newdt, dt)\r
70 self.assertEqual(m, None)\r
71\r
72 (newdt,), m = xmlrpclib.loads(s, use_datetime=0)\r
73 self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))\r
74\r
75 def test_cmp_datetime_DateTime(self):\r
76 now = datetime.datetime.now()\r
77 dt = xmlrpclib.DateTime(now.timetuple())\r
78 self.assertTrue(dt == now)\r
79 self.assertTrue(now == dt)\r
80 then = now + datetime.timedelta(seconds=4)\r
81 self.assertTrue(then >= dt)\r
82 self.assertTrue(dt < then)\r
83\r
84 def test_bug_1164912 (self):\r
85 d = xmlrpclib.DateTime()\r
86 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),\r
87 methodresponse=True))\r
88 self.assertIsInstance(new_d.value, str)\r
89\r
90 # Check that the output of dumps() is still an 8-bit string\r
91 s = xmlrpclib.dumps((new_d,), methodresponse=True)\r
92 self.assertIsInstance(s, str)\r
93\r
94 def test_newstyle_class(self):\r
95 class T(object):\r
96 pass\r
97 t = T()\r
98 t.x = 100\r
99 t.y = "Hello"\r
100 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))\r
101 self.assertEqual(t2, t.__dict__)\r
102\r
103 def test_dump_big_long(self):\r
104 self.assertRaises(OverflowError, xmlrpclib.dumps, (2L**99,))\r
105\r
106 def test_dump_bad_dict(self):\r
107 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))\r
108\r
109 def test_dump_recursive_seq(self):\r
110 l = [1,2,3]\r
111 t = [3,4,5,l]\r
112 l.append(t)\r
113 self.assertRaises(TypeError, xmlrpclib.dumps, (l,))\r
114\r
115 def test_dump_recursive_dict(self):\r
116 d = {'1':1, '2':1}\r
117 t = {'3':3, 'd':d}\r
118 d['t'] = t\r
119 self.assertRaises(TypeError, xmlrpclib.dumps, (d,))\r
120\r
121 def test_dump_big_int(self):\r
122 if sys.maxint > 2L**31-1:\r
123 self.assertRaises(OverflowError, xmlrpclib.dumps,\r
124 (int(2L**34),))\r
125\r
126 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))\r
127 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MAXINT+1,))\r
128 self.assertRaises(OverflowError, xmlrpclib.dumps, (xmlrpclib.MININT-1,))\r
129\r
130 def dummy_write(s):\r
131 pass\r
132\r
133 m = xmlrpclib.Marshaller()\r
134 m.dump_int(xmlrpclib.MAXINT, dummy_write)\r
135 m.dump_int(xmlrpclib.MININT, dummy_write)\r
136 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MAXINT+1, dummy_write)\r
137 self.assertRaises(OverflowError, m.dump_int, xmlrpclib.MININT-1, dummy_write)\r
138\r
139\r
140 def test_dump_none(self):\r
141 value = alist + [None]\r
142 arg1 = (alist + [None],)\r
143 strg = xmlrpclib.dumps(arg1, allow_none=True)\r
144 self.assertEqual(value,\r
145 xmlrpclib.loads(strg)[0][0])\r
146 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))\r
147\r
148 def test_default_encoding_issues(self):\r
149 # SF bug #1115989: wrong decoding in '_stringify'\r
150 utf8 = """<?xml version='1.0' encoding='iso-8859-1'?>\r
151 <params>\r
152 <param><value>\r
153 <string>abc \x95</string>\r
154 </value></param>\r
155 <param><value>\r
156 <struct>\r
157 <member>\r
158 <name>def \x96</name>\r
159 <value><string>ghi \x97</string></value>\r
160 </member>\r
161 </struct>\r
162 </value></param>\r
163 </params>\r
164 """\r
165\r
166 # sys.setdefaultencoding() normally doesn't exist after site.py is\r
167 # loaded. Import a temporary fresh copy to get access to it\r
168 # but then restore the original copy to avoid messing with\r
169 # other potentially modified sys module attributes\r
170 old_encoding = sys.getdefaultencoding()\r
171 with test_support.CleanImport('sys'):\r
172 import sys as temp_sys\r
173 temp_sys.setdefaultencoding("iso-8859-1")\r
174 try:\r
175 (s, d), m = xmlrpclib.loads(utf8)\r
176 finally:\r
177 temp_sys.setdefaultencoding(old_encoding)\r
178\r
179 items = d.items()\r
180 if have_unicode:\r
181 self.assertEqual(s, u"abc \x95")\r
182 self.assertIsInstance(s, unicode)\r
183 self.assertEqual(items, [(u"def \x96", u"ghi \x97")])\r
184 self.assertIsInstance(items[0][0], unicode)\r
185 self.assertIsInstance(items[0][1], unicode)\r
186 else:\r
187 self.assertEqual(s, "abc \xc2\x95")\r
188 self.assertEqual(items, [("def \xc2\x96", "ghi \xc2\x97")])\r
189\r
190\r
191class HelperTestCase(unittest.TestCase):\r
192 def test_escape(self):\r
193 self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")\r
194 self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")\r
195 self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")\r
196\r
197class FaultTestCase(unittest.TestCase):\r
198 def test_repr(self):\r
199 f = xmlrpclib.Fault(42, 'Test Fault')\r
200 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")\r
201 self.assertEqual(repr(f), str(f))\r
202\r
203 def test_dump_fault(self):\r
204 f = xmlrpclib.Fault(42, 'Test Fault')\r
205 s = xmlrpclib.dumps((f,))\r
206 (newf,), m = xmlrpclib.loads(s)\r
207 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})\r
208 self.assertEqual(m, None)\r
209\r
210 s = xmlrpclib.Marshaller().dumps(f)\r
211 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)\r
212\r
213\r
214class DateTimeTestCase(unittest.TestCase):\r
215 def test_default(self):\r
216 t = xmlrpclib.DateTime()\r
217\r
218 def test_time(self):\r
219 d = 1181399930.036952\r
220 t = xmlrpclib.DateTime(d)\r
221 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))\r
222\r
223 def test_time_tuple(self):\r
224 d = (2007,6,9,10,38,50,5,160,0)\r
225 t = xmlrpclib.DateTime(d)\r
226 self.assertEqual(str(t), '20070609T10:38:50')\r
227\r
228 def test_time_struct(self):\r
229 d = time.localtime(1181399930.036952)\r
230 t = xmlrpclib.DateTime(d)\r
231 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))\r
232\r
233 def test_datetime_datetime(self):\r
234 d = datetime.datetime(2007,1,2,3,4,5)\r
235 t = xmlrpclib.DateTime(d)\r
236 self.assertEqual(str(t), '20070102T03:04:05')\r
237\r
238 def test_repr(self):\r
239 d = datetime.datetime(2007,1,2,3,4,5)\r
240 t = xmlrpclib.DateTime(d)\r
241 val ="<DateTime '20070102T03:04:05' at %x>" % id(t)\r
242 self.assertEqual(repr(t), val)\r
243\r
244 def test_decode(self):\r
245 d = ' 20070908T07:11:13 '\r
246 t1 = xmlrpclib.DateTime()\r
247 t1.decode(d)\r
248 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))\r
249 self.assertEqual(t1, tref)\r
250\r
251 t2 = xmlrpclib._datetime(d)\r
252 self.assertEqual(t1, tref)\r
253\r
254class BinaryTestCase(unittest.TestCase):\r
255 def test_default(self):\r
256 t = xmlrpclib.Binary()\r
257 self.assertEqual(str(t), '')\r
258\r
259 def test_string(self):\r
260 d = '\x01\x02\x03abc123\xff\xfe'\r
261 t = xmlrpclib.Binary(d)\r
262 self.assertEqual(str(t), d)\r
263\r
264 def test_decode(self):\r
265 d = '\x01\x02\x03abc123\xff\xfe'\r
266 de = base64.encodestring(d)\r
267 t1 = xmlrpclib.Binary()\r
268 t1.decode(de)\r
269 self.assertEqual(str(t1), d)\r
270\r
271 t2 = xmlrpclib._binary(de)\r
272 self.assertEqual(str(t2), d)\r
273\r
274\r
275ADDR = PORT = URL = None\r
276\r
277# The evt is set twice. First when the server is ready to serve.\r
278# Second when the server has been shutdown. The user must clear\r
279# the event after it has been set the first time to catch the second set.\r
280def http_server(evt, numrequests, requestHandler=None):\r
281 class TestInstanceClass:\r
282 def div(self, x, y):\r
283 return x // y\r
284\r
285 def _methodHelp(self, name):\r
286 if name == 'div':\r
287 return 'This is the div function'\r
288\r
289 def my_function():\r
290 '''This is my function'''\r
291 return True\r
292\r
293 class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):\r
294 def get_request(self):\r
295 # Ensure the socket is always non-blocking. On Linux, socket\r
296 # attributes are not inherited like they are on *BSD and Windows.\r
297 s, port = self.socket.accept()\r
298 s.setblocking(True)\r
299 return s, port\r
300\r
301 if not requestHandler:\r
302 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler\r
303 serv = MyXMLRPCServer(("localhost", 0), requestHandler,\r
304 logRequests=False, bind_and_activate=False)\r
305 try:\r
306 serv.socket.settimeout(3)\r
307 serv.server_bind()\r
308 global ADDR, PORT, URL\r
309 ADDR, PORT = serv.socket.getsockname()\r
310 #connect to IP address directly. This avoids socket.create_connection()\r
311 #trying to connect to to "localhost" using all address families, which\r
312 #causes slowdown e.g. on vista which supports AF_INET6. The server listens\r
313 #on AF_INET only.\r
314 URL = "http://%s:%d"%(ADDR, PORT)\r
315 serv.server_activate()\r
316 serv.register_introspection_functions()\r
317 serv.register_multicall_functions()\r
318 serv.register_function(pow)\r
319 serv.register_function(lambda x,y: x+y, 'add')\r
320 serv.register_function(my_function)\r
321 serv.register_instance(TestInstanceClass())\r
322 evt.set()\r
323\r
324 # handle up to 'numrequests' requests\r
325 while numrequests > 0:\r
326 serv.handle_request()\r
327 numrequests -= 1\r
328\r
329 except socket.timeout:\r
330 pass\r
331 finally:\r
332 serv.socket.close()\r
333 PORT = None\r
334 evt.set()\r
335\r
336def http_multi_server(evt, numrequests, requestHandler=None):\r
337 class TestInstanceClass:\r
338 def div(self, x, y):\r
339 return x // y\r
340\r
341 def _methodHelp(self, name):\r
342 if name == 'div':\r
343 return 'This is the div function'\r
344\r
345 def my_function():\r
346 '''This is my function'''\r
347 return True\r
348\r
349 class MyXMLRPCServer(SimpleXMLRPCServer.MultiPathXMLRPCServer):\r
350 def get_request(self):\r
351 # Ensure the socket is always non-blocking. On Linux, socket\r
352 # attributes are not inherited like they are on *BSD and Windows.\r
353 s, port = self.socket.accept()\r
354 s.setblocking(True)\r
355 return s, port\r
356\r
357 if not requestHandler:\r
358 requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler\r
359 class MyRequestHandler(requestHandler):\r
360 rpc_paths = []\r
361\r
362 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,\r
363 logRequests=False, bind_and_activate=False)\r
364 serv.socket.settimeout(3)\r
365 serv.server_bind()\r
366 try:\r
367 global ADDR, PORT, URL\r
368 ADDR, PORT = serv.socket.getsockname()\r
369 #connect to IP address directly. This avoids socket.create_connection()\r
370 #trying to connect to to "localhost" using all address families, which\r
371 #causes slowdown e.g. on vista which supports AF_INET6. The server listens\r
372 #on AF_INET only.\r
373 URL = "http://%s:%d"%(ADDR, PORT)\r
374 serv.server_activate()\r
375 paths = ["/foo", "/foo/bar"]\r
376 for path in paths:\r
377 d = serv.add_dispatcher(path, SimpleXMLRPCServer.SimpleXMLRPCDispatcher())\r
378 d.register_introspection_functions()\r
379 d.register_multicall_functions()\r
380 serv.get_dispatcher(paths[0]).register_function(pow)\r
381 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')\r
382 evt.set()\r
383\r
384 # handle up to 'numrequests' requests\r
385 while numrequests > 0:\r
386 serv.handle_request()\r
387 numrequests -= 1\r
388\r
389 except socket.timeout:\r
390 pass\r
391 finally:\r
392 serv.socket.close()\r
393 PORT = None\r
394 evt.set()\r
395\r
396# This function prevents errors like:\r
397# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>\r
398def is_unavailable_exception(e):\r
399 '''Returns True if the given ProtocolError is the product of a server-side\r
400 exception caused by the 'temporarily unavailable' response sometimes\r
401 given by operations on non-blocking sockets.'''\r
402\r
403 # sometimes we get a -1 error code and/or empty headers\r
404 try:\r
405 if e.errcode == -1 or e.headers is None:\r
406 return True\r
407 exc_mess = e.headers.get('X-exception')\r
408 except AttributeError:\r
409 # Ignore socket.errors here.\r
410 exc_mess = str(e)\r
411\r
412 if exc_mess and 'temporarily unavailable' in exc_mess.lower():\r
413 return True\r
414\r
415 return False\r
416\r
417@unittest.skipUnless(threading, 'Threading required for this test.')\r
418class BaseServerTestCase(unittest.TestCase):\r
419 requestHandler = None\r
420 request_count = 1\r
421 threadFunc = staticmethod(http_server)\r
422\r
423 def setUp(self):\r
424 # enable traceback reporting\r
425 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True\r
426\r
427 self.evt = threading.Event()\r
428 # start server thread to handle requests\r
429 serv_args = (self.evt, self.request_count, self.requestHandler)\r
430 threading.Thread(target=self.threadFunc, args=serv_args).start()\r
431\r
432 # wait for the server to be ready\r
433 self.evt.wait(10)\r
434 self.evt.clear()\r
435\r
436 def tearDown(self):\r
437 # wait on the server thread to terminate\r
438 self.evt.wait(10)\r
439\r
440 # disable traceback reporting\r
441 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False\r
442\r
443# NOTE: The tests in SimpleServerTestCase will ignore failures caused by\r
444# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This\r
445# condition occurs infrequently on some platforms, frequently on others, and\r
446# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket\r
447# If the server class is updated at some point in the future to handle this\r
448# situation more gracefully, these tests should be modified appropriately.\r
449\r
450class SimpleServerTestCase(BaseServerTestCase):\r
451 def test_simple1(self):\r
452 try:\r
453 p = xmlrpclib.ServerProxy(URL)\r
454 self.assertEqual(p.pow(6,8), 6**8)\r
455 except (xmlrpclib.ProtocolError, socket.error), e:\r
456 # ignore failures due to non-blocking socket 'unavailable' errors\r
457 if not is_unavailable_exception(e):\r
458 # protocol error; provide additional information in test output\r
459 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
460\r
461 def test_nonascii(self):\r
462 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'\r
463 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'\r
464\r
465 try:\r
466 p = xmlrpclib.ServerProxy(URL)\r
467 self.assertEqual(p.add(start_string, end_string),\r
468 start_string + end_string)\r
469 except (xmlrpclib.ProtocolError, socket.error) as e:\r
470 # ignore failures due to non-blocking socket unavailable errors.\r
471 if not is_unavailable_exception(e):\r
472 # protocol error; provide additional information in test output\r
473 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
474\r
475\r
476 # [ch] The test 404 is causing lots of false alarms.\r
477 def XXXtest_404(self):\r
478 # send POST with httplib, it should return 404 header and\r
479 # 'Not Found' message.\r
480 conn = httplib.HTTPConnection(ADDR, PORT)\r
481 conn.request('POST', '/this-is-not-valid')\r
482 response = conn.getresponse()\r
483 conn.close()\r
484\r
485 self.assertEqual(response.status, 404)\r
486 self.assertEqual(response.reason, 'Not Found')\r
487\r
488 def test_introspection1(self):\r
489 try:\r
490 p = xmlrpclib.ServerProxy(URL)\r
491 meth = p.system.listMethods()\r
492 expected_methods = set(['pow', 'div', 'my_function', 'add',\r
493 'system.listMethods', 'system.methodHelp',\r
494 'system.methodSignature', 'system.multicall'])\r
495 self.assertEqual(set(meth), expected_methods)\r
496 except (xmlrpclib.ProtocolError, socket.error), e:\r
497 # ignore failures due to non-blocking socket 'unavailable' errors\r
498 if not is_unavailable_exception(e):\r
499 # protocol error; provide additional information in test output\r
500 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
501\r
502 def test_introspection2(self):\r
503 try:\r
504 # test _methodHelp()\r
505 p = xmlrpclib.ServerProxy(URL)\r
506 divhelp = p.system.methodHelp('div')\r
507 self.assertEqual(divhelp, 'This is the div function')\r
508 except (xmlrpclib.ProtocolError, socket.error), e:\r
509 # ignore failures due to non-blocking socket 'unavailable' errors\r
510 if not is_unavailable_exception(e):\r
511 # protocol error; provide additional information in test output\r
512 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
513\r
514 @unittest.skipIf(sys.flags.optimize >= 2,\r
515 "Docstrings are omitted with -O2 and above")\r
516 def test_introspection3(self):\r
517 try:\r
518 # test native doc\r
519 p = xmlrpclib.ServerProxy(URL)\r
520 myfunction = p.system.methodHelp('my_function')\r
521 self.assertEqual(myfunction, 'This is my function')\r
522 except (xmlrpclib.ProtocolError, socket.error), e:\r
523 # ignore failures due to non-blocking socket 'unavailable' errors\r
524 if not is_unavailable_exception(e):\r
525 # protocol error; provide additional information in test output\r
526 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
527\r
528 def test_introspection4(self):\r
529 # the SimpleXMLRPCServer doesn't support signatures, but\r
530 # at least check that we can try making the call\r
531 try:\r
532 p = xmlrpclib.ServerProxy(URL)\r
533 divsig = p.system.methodSignature('div')\r
534 self.assertEqual(divsig, 'signatures not supported')\r
535 except (xmlrpclib.ProtocolError, socket.error), e:\r
536 # ignore failures due to non-blocking socket 'unavailable' errors\r
537 if not is_unavailable_exception(e):\r
538 # protocol error; provide additional information in test output\r
539 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
540\r
541 def test_multicall(self):\r
542 try:\r
543 p = xmlrpclib.ServerProxy(URL)\r
544 multicall = xmlrpclib.MultiCall(p)\r
545 multicall.add(2,3)\r
546 multicall.pow(6,8)\r
547 multicall.div(127,42)\r
548 add_result, pow_result, div_result = multicall()\r
549 self.assertEqual(add_result, 2+3)\r
550 self.assertEqual(pow_result, 6**8)\r
551 self.assertEqual(div_result, 127//42)\r
552 except (xmlrpclib.ProtocolError, socket.error), e:\r
553 # ignore failures due to non-blocking socket 'unavailable' errors\r
554 if not is_unavailable_exception(e):\r
555 # protocol error; provide additional information in test output\r
556 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
557\r
558 def test_non_existing_multicall(self):\r
559 try:\r
560 p = xmlrpclib.ServerProxy(URL)\r
561 multicall = xmlrpclib.MultiCall(p)\r
562 multicall.this_is_not_exists()\r
563 result = multicall()\r
564\r
565 # result.results contains;\r
566 # [{'faultCode': 1, 'faultString': '<type \'exceptions.Exception\'>:'\r
567 # 'method "this_is_not_exists" is not supported'>}]\r
568\r
569 self.assertEqual(result.results[0]['faultCode'], 1)\r
570 self.assertEqual(result.results[0]['faultString'],\r
571 '<type \'exceptions.Exception\'>:method "this_is_not_exists" '\r
572 'is not supported')\r
573 except (xmlrpclib.ProtocolError, socket.error), e:\r
574 # ignore failures due to non-blocking socket 'unavailable' errors\r
575 if not is_unavailable_exception(e):\r
576 # protocol error; provide additional information in test output\r
577 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
578\r
579 def test_dotted_attribute(self):\r
580 # Raises an AttributeError because private methods are not allowed.\r
581 self.assertRaises(AttributeError,\r
582 SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')\r
583\r
584 self.assertTrue(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))\r
585 # Get the test to run faster by sending a request with test_simple1.\r
586 # This avoids waiting for the socket timeout.\r
587 self.test_simple1()\r
588\r
589class MultiPathServerTestCase(BaseServerTestCase):\r
590 threadFunc = staticmethod(http_multi_server)\r
591 request_count = 2\r
592 def test_path1(self):\r
593 p = xmlrpclib.ServerProxy(URL+"/foo")\r
594 self.assertEqual(p.pow(6,8), 6**8)\r
595 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)\r
596 def test_path2(self):\r
597 p = xmlrpclib.ServerProxy(URL+"/foo/bar")\r
598 self.assertEqual(p.add(6,8), 6+8)\r
599 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)\r
600\r
601#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism\r
602#does indeed serve subsequent requests on the same connection\r
603class BaseKeepaliveServerTestCase(BaseServerTestCase):\r
604 #a request handler that supports keep-alive and logs requests into a\r
605 #class variable\r
606 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):\r
607 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler\r
608 protocol_version = 'HTTP/1.1'\r
609 myRequests = []\r
610 def handle(self):\r
611 self.myRequests.append([])\r
612 self.reqidx = len(self.myRequests)-1\r
613 return self.parentClass.handle(self)\r
614 def handle_one_request(self):\r
615 result = self.parentClass.handle_one_request(self)\r
616 self.myRequests[self.reqidx].append(self.raw_requestline)\r
617 return result\r
618\r
619 requestHandler = RequestHandler\r
620 def setUp(self):\r
621 #clear request log\r
622 self.RequestHandler.myRequests = []\r
623 return BaseServerTestCase.setUp(self)\r
624\r
625#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism\r
626#does indeed serve subsequent requests on the same connection\r
627class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):\r
628 def test_two(self):\r
629 p = xmlrpclib.ServerProxy(URL)\r
630 #do three requests.\r
631 self.assertEqual(p.pow(6,8), 6**8)\r
632 self.assertEqual(p.pow(6,8), 6**8)\r
633 self.assertEqual(p.pow(6,8), 6**8)\r
634\r
635 #they should have all been handled by a single request handler\r
636 self.assertEqual(len(self.RequestHandler.myRequests), 1)\r
637\r
638 #check that we did at least two (the third may be pending append\r
639 #due to thread scheduling)\r
640 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)\r
641\r
642#test special attribute access on the serverproxy, through the __call__\r
643#function.\r
644class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):\r
645 #ask for two keepalive requests to be handled.\r
646 request_count=2\r
647\r
648 def test_close(self):\r
649 p = xmlrpclib.ServerProxy(URL)\r
650 #do some requests with close.\r
651 self.assertEqual(p.pow(6,8), 6**8)\r
652 self.assertEqual(p.pow(6,8), 6**8)\r
653 self.assertEqual(p.pow(6,8), 6**8)\r
654 p("close")() #this should trigger a new keep-alive request\r
655 self.assertEqual(p.pow(6,8), 6**8)\r
656 self.assertEqual(p.pow(6,8), 6**8)\r
657 self.assertEqual(p.pow(6,8), 6**8)\r
658\r
659 #they should have all been two request handlers, each having logged at least\r
660 #two complete requests\r
661 self.assertEqual(len(self.RequestHandler.myRequests), 2)\r
662 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)\r
663 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)\r
664\r
665 def test_transport(self):\r
666 p = xmlrpclib.ServerProxy(URL)\r
667 #do some requests with close.\r
668 self.assertEqual(p.pow(6,8), 6**8)\r
669 p("transport").close() #same as above, really.\r
670 self.assertEqual(p.pow(6,8), 6**8)\r
671 self.assertEqual(len(self.RequestHandler.myRequests), 2)\r
672\r
673#A test case that verifies that gzip encoding works in both directions\r
674#(for a request and the response)\r
675class GzipServerTestCase(BaseServerTestCase):\r
676 #a request handler that supports keep-alive and logs requests into a\r
677 #class variable\r
678 class RequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):\r
679 parentClass = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler\r
680 protocol_version = 'HTTP/1.1'\r
681\r
682 def do_POST(self):\r
683 #store content of last request in class\r
684 self.__class__.content_length = int(self.headers["content-length"])\r
685 return self.parentClass.do_POST(self)\r
686 requestHandler = RequestHandler\r
687\r
688 class Transport(xmlrpclib.Transport):\r
689 #custom transport, stores the response length for our perusal\r
690 fake_gzip = False\r
691 def parse_response(self, response):\r
692 self.response_length=int(response.getheader("content-length", 0))\r
693 return xmlrpclib.Transport.parse_response(self, response)\r
694\r
695 def send_content(self, connection, body):\r
696 if self.fake_gzip:\r
697 #add a lone gzip header to induce decode error remotely\r
698 connection.putheader("Content-Encoding", "gzip")\r
699 return xmlrpclib.Transport.send_content(self, connection, body)\r
700\r
701 def setUp(self):\r
702 BaseServerTestCase.setUp(self)\r
703\r
704 def test_gzip_request(self):\r
705 t = self.Transport()\r
706 t.encode_threshold = None\r
707 p = xmlrpclib.ServerProxy(URL, transport=t)\r
708 self.assertEqual(p.pow(6,8), 6**8)\r
709 a = self.RequestHandler.content_length\r
710 t.encode_threshold = 0 #turn on request encoding\r
711 self.assertEqual(p.pow(6,8), 6**8)\r
712 b = self.RequestHandler.content_length\r
713 self.assertTrue(a>b)\r
714\r
715 def test_bad_gzip_request(self):\r
716 t = self.Transport()\r
717 t.encode_threshold = None\r
718 t.fake_gzip = True\r
719 p = xmlrpclib.ServerProxy(URL, transport=t)\r
720 cm = self.assertRaisesRegexp(xmlrpclib.ProtocolError,\r
721 re.compile(r"\b400\b"))\r
722 with cm:\r
723 p.pow(6, 8)\r
724\r
725 def test_gsip_response(self):\r
726 t = self.Transport()\r
727 p = xmlrpclib.ServerProxy(URL, transport=t)\r
728 old = self.requestHandler.encode_threshold\r
729 self.requestHandler.encode_threshold = None #no encoding\r
730 self.assertEqual(p.pow(6,8), 6**8)\r
731 a = t.response_length\r
732 self.requestHandler.encode_threshold = 0 #always encode\r
733 self.assertEqual(p.pow(6,8), 6**8)\r
734 b = t.response_length\r
735 self.requestHandler.encode_threshold = old\r
736 self.assertTrue(a>b)\r
737\r
738#Test special attributes of the ServerProxy object\r
739class ServerProxyTestCase(unittest.TestCase):\r
740 def setUp(self):\r
741 unittest.TestCase.setUp(self)\r
742 if threading:\r
743 self.url = URL\r
744 else:\r
745 # Without threading, http_server() and http_multi_server() will not\r
746 # be executed and URL is still equal to None. 'http://' is a just\r
747 # enough to choose the scheme (HTTP)\r
748 self.url = 'http://'\r
749\r
750 def test_close(self):\r
751 p = xmlrpclib.ServerProxy(self.url)\r
752 self.assertEqual(p('close')(), None)\r
753\r
754 def test_transport(self):\r
755 t = xmlrpclib.Transport()\r
756 p = xmlrpclib.ServerProxy(self.url, transport=t)\r
757 self.assertEqual(p('transport'), t)\r
758\r
759# This is a contrived way to make a failure occur on the server side\r
760# in order to test the _send_traceback_header flag on the server\r
761class FailingMessageClass(mimetools.Message):\r
762 def __getitem__(self, key):\r
763 key = key.lower()\r
764 if key == 'content-length':\r
765 return 'I am broken'\r
766 return mimetools.Message.__getitem__(self, key)\r
767\r
768\r
769@unittest.skipUnless(threading, 'Threading required for this test.')\r
770class FailingServerTestCase(unittest.TestCase):\r
771 def setUp(self):\r
772 self.evt = threading.Event()\r
773 # start server thread to handle requests\r
774 serv_args = (self.evt, 1)\r
775 threading.Thread(target=http_server, args=serv_args).start()\r
776\r
777 # wait for the server to be ready\r
778 self.evt.wait()\r
779 self.evt.clear()\r
780\r
781 def tearDown(self):\r
782 # wait on the server thread to terminate\r
783 self.evt.wait()\r
784 # reset flag\r
785 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False\r
786 # reset message class\r
787 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message\r
788\r
789 def test_basic(self):\r
790 # check that flag is false by default\r
791 flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header\r
792 self.assertEqual(flagval, False)\r
793\r
794 # enable traceback reporting\r
795 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True\r
796\r
797 # test a call that shouldn't fail just as a smoke test\r
798 try:\r
799 p = xmlrpclib.ServerProxy(URL)\r
800 self.assertEqual(p.pow(6,8), 6**8)\r
801 except (xmlrpclib.ProtocolError, socket.error), e:\r
802 # ignore failures due to non-blocking socket 'unavailable' errors\r
803 if not is_unavailable_exception(e):\r
804 # protocol error; provide additional information in test output\r
805 self.fail("%s\n%s" % (e, getattr(e, "headers", "")))\r
806\r
807 def test_fail_no_info(self):\r
808 # use the broken message class\r
809 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass\r
810\r
811 try:\r
812 p = xmlrpclib.ServerProxy(URL)\r
813 p.pow(6,8)\r
814 except (xmlrpclib.ProtocolError, socket.error), e:\r
815 # ignore failures due to non-blocking socket 'unavailable' errors\r
816 if not is_unavailable_exception(e) and hasattr(e, "headers"):\r
817 # The two server-side error headers shouldn't be sent back in this case\r
818 self.assertTrue(e.headers.get("X-exception") is None)\r
819 self.assertTrue(e.headers.get("X-traceback") is None)\r
820 else:\r
821 self.fail('ProtocolError not raised')\r
822\r
823 def test_fail_with_info(self):\r
824 # use the broken message class\r
825 SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass\r
826\r
827 # Check that errors in the server send back exception/traceback\r
828 # info when flag is set\r
829 SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True\r
830\r
831 try:\r
832 p = xmlrpclib.ServerProxy(URL)\r
833 p.pow(6,8)\r
834 except (xmlrpclib.ProtocolError, socket.error), e:\r
835 # ignore failures due to non-blocking socket 'unavailable' errors\r
836 if not is_unavailable_exception(e) and hasattr(e, "headers"):\r
837 # We should get error info in the response\r
838 expected_err = "invalid literal for int() with base 10: 'I am broken'"\r
839 self.assertEqual(e.headers.get("x-exception"), expected_err)\r
840 self.assertTrue(e.headers.get("x-traceback") is not None)\r
841 else:\r
842 self.fail('ProtocolError not raised')\r
843\r
844class CGIHandlerTestCase(unittest.TestCase):\r
845 def setUp(self):\r
846 self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()\r
847\r
848 def tearDown(self):\r
849 self.cgi = None\r
850\r
851 def test_cgi_get(self):\r
852 with test_support.EnvironmentVarGuard() as env:\r
853 env['REQUEST_METHOD'] = 'GET'\r
854 # if the method is GET and no request_text is given, it runs handle_get\r
855 # get sysout output\r
856 with test_support.captured_stdout() as data_out:\r
857 self.cgi.handle_request()\r
858\r
859 # parse Status header\r
860 data_out.seek(0)\r
861 handle = data_out.read()\r
862 status = handle.split()[1]\r
863 message = ' '.join(handle.split()[2:4])\r
864\r
865 self.assertEqual(status, '400')\r
866 self.assertEqual(message, 'Bad Request')\r
867\r
868\r
869 def test_cgi_xmlrpc_response(self):\r
870 data = """<?xml version='1.0'?>\r
871 <methodCall>\r
872 <methodName>test_method</methodName>\r
873 <params>\r
874 <param>\r
875 <value><string>foo</string></value>\r
876 </param>\r
877 <param>\r
878 <value><string>bar</string></value>\r
879 </param>\r
880 </params>\r
881 </methodCall>\r
882 """\r
883\r
884 with test_support.EnvironmentVarGuard() as env, \\r
885 test_support.captured_stdout() as data_out, \\r
886 test_support.captured_stdin() as data_in:\r
887 data_in.write(data)\r
888 data_in.seek(0)\r
889 env['CONTENT_LENGTH'] = str(len(data))\r
890 self.cgi.handle_request()\r
891 data_out.seek(0)\r
892\r
893 # will respond exception, if so, our goal is achieved ;)\r
894 handle = data_out.read()\r
895\r
896 # start with 44th char so as not to get http header, we just need only xml\r
897 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])\r
898\r
899 # Also test the content-length returned by handle_request\r
900 # Using the same test method inorder to avoid all the datapassing\r
901 # boilerplate code.\r
902 # Test for bug: http://bugs.python.org/issue5040\r
903\r
904 content = handle[handle.find("<?xml"):]\r
905\r
906 self.assertEqual(\r
907 int(re.search('Content-Length: (\d+)', handle).group(1)),\r
908 len(content))\r
909\r
910\r
911class FakeSocket:\r
912\r
913 def __init__(self):\r
914 self.data = StringIO.StringIO()\r
915\r
916 def send(self, buf):\r
917 self.data.write(buf)\r
918 return len(buf)\r
919\r
920 def sendall(self, buf):\r
921 self.data.write(buf)\r
922\r
923 def getvalue(self):\r
924 return self.data.getvalue()\r
925\r
926 def makefile(self, x='r', y=-1):\r
927 raise RuntimeError\r
928\r
929 def close(self):\r
930 pass\r
931\r
932class FakeTransport(xmlrpclib.Transport):\r
933 """A Transport instance that records instead of sending a request.\r
934\r
935 This class replaces the actual socket used by httplib with a\r
936 FakeSocket object that records the request. It doesn't provide a\r
937 response.\r
938 """\r
939\r
940 def make_connection(self, host):\r
941 conn = xmlrpclib.Transport.make_connection(self, host)\r
942 conn.sock = self.fake_socket = FakeSocket()\r
943 return conn\r
944\r
945class TransportSubclassTestCase(unittest.TestCase):\r
946\r
947 def issue_request(self, transport_class):\r
948 """Return an HTTP request made via transport_class."""\r
949 transport = transport_class()\r
950 proxy = xmlrpclib.ServerProxy("http://example.com/",\r
951 transport=transport)\r
952 try:\r
953 proxy.pow(6, 8)\r
954 except RuntimeError:\r
955 return transport.fake_socket.getvalue()\r
956 return None\r
957\r
958 def test_custom_user_agent(self):\r
959 class TestTransport(FakeTransport):\r
960\r
961 def send_user_agent(self, conn):\r
962 xmlrpclib.Transport.send_user_agent(self, conn)\r
963 conn.putheader("X-Test", "test_custom_user_agent")\r
964\r
965 req = self.issue_request(TestTransport)\r
966 self.assertIn("X-Test: test_custom_user_agent\r\n", req)\r
967\r
968 def test_send_host(self):\r
969 class TestTransport(FakeTransport):\r
970\r
971 def send_host(self, conn, host):\r
972 xmlrpclib.Transport.send_host(self, conn, host)\r
973 conn.putheader("X-Test", "test_send_host")\r
974\r
975 req = self.issue_request(TestTransport)\r
976 self.assertIn("X-Test: test_send_host\r\n", req)\r
977\r
978 def test_send_request(self):\r
979 class TestTransport(FakeTransport):\r
980\r
981 def send_request(self, conn, url, body):\r
982 xmlrpclib.Transport.send_request(self, conn, url, body)\r
983 conn.putheader("X-Test", "test_send_request")\r
984\r
985 req = self.issue_request(TestTransport)\r
986 self.assertIn("X-Test: test_send_request\r\n", req)\r
987\r
988 def test_send_content(self):\r
989 class TestTransport(FakeTransport):\r
990\r
991 def send_content(self, conn, body):\r
992 conn.putheader("X-Test", "test_send_content")\r
993 xmlrpclib.Transport.send_content(self, conn, body)\r
994\r
995 req = self.issue_request(TestTransport)\r
996 self.assertIn("X-Test: test_send_content\r\n", req)\r
997\r
998@test_support.reap_threads\r
999def test_main():\r
1000 xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,\r
1001 BinaryTestCase, FaultTestCase, TransportSubclassTestCase]\r
1002 xmlrpc_tests.append(SimpleServerTestCase)\r
1003 xmlrpc_tests.append(KeepaliveServerTestCase1)\r
1004 xmlrpc_tests.append(KeepaliveServerTestCase2)\r
1005 try:\r
1006 import gzip\r
1007 xmlrpc_tests.append(GzipServerTestCase)\r
1008 except ImportError:\r
1009 pass #gzip not supported in this build\r
1010 xmlrpc_tests.append(MultiPathServerTestCase)\r
1011 xmlrpc_tests.append(ServerProxyTestCase)\r
1012 xmlrpc_tests.append(FailingServerTestCase)\r
1013 xmlrpc_tests.append(CGIHandlerTestCase)\r
1014\r
1015 test_support.run_unittest(*xmlrpc_tests)\r
1016\r
1017if __name__ == "__main__":\r
1018 test_main()\r