1import base64
2import datetime
3import decimal
4import sys
5import time
6import unittest
7from unittest import mock
8import xmlrpc.client as xmlrpclib
9import xmlrpc.server
10import http.client
11import http, http.server
12import socket
13import threading
14import re
15import io
16import contextlib
17from test import support
18
19try:
20    import gzip
21except ImportError:
22    gzip = None
23
24alist = [{'astring': 'foo@bar.baz.spam',
25          'afloat': 7283.43,
26          'anint': 2**20,
27          'ashortlong': 2,
28          'anotherlist': ['.zyx.41'],
29          'abase64': xmlrpclib.Binary(b"my dog has fleas"),
30          'b64bytes': b"my dog has fleas",
31          'b64bytearray': bytearray(b"my dog has fleas"),
32          'boolean': False,
33          'unicode': '\u4000\u6000\u8000',
34          'ukey\u4000': 'regular value',
35          'datetime1': xmlrpclib.DateTime('20050210T11:41:23'),
36          'datetime2': xmlrpclib.DateTime(
37                        (2005, 2, 10, 11, 41, 23, 0, 1, -1)),
38          'datetime3': xmlrpclib.DateTime(
39                        datetime.datetime(2005, 2, 10, 11, 41, 23)),
40          }]
41
42class XMLRPCTestCase(unittest.TestCase):
43
44    def test_dump_load(self):
45        dump = xmlrpclib.dumps((alist,))
46        load = xmlrpclib.loads(dump)
47        self.assertEqual(alist, load[0][0])
48
49    def test_dump_bare_datetime(self):
50        # This checks that an unwrapped datetime.date object can be handled
51        # by the marshalling code.  This can't be done via test_dump_load()
52        # since with use_builtin_types set to 1 the unmarshaller would create
53        # datetime objects for the 'datetime[123]' keys as well
54        dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
55        self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
56        s = xmlrpclib.dumps((dt,))
57
58        result, m = xmlrpclib.loads(s, use_builtin_types=True)
59        (newdt,) = result
60        self.assertEqual(newdt, dt)
61        self.assertIs(type(newdt), datetime.datetime)
62        self.assertIsNone(m)
63
64        result, m = xmlrpclib.loads(s, use_builtin_types=False)
65        (newdt,) = result
66        self.assertEqual(newdt, dt)
67        self.assertIs(type(newdt), xmlrpclib.DateTime)
68        self.assertIsNone(m)
69
70        result, m = xmlrpclib.loads(s, use_datetime=True)
71        (newdt,) = result
72        self.assertEqual(newdt, dt)
73        self.assertIs(type(newdt), datetime.datetime)
74        self.assertIsNone(m)
75
76        result, m = xmlrpclib.loads(s, use_datetime=False)
77        (newdt,) = result
78        self.assertEqual(newdt, dt)
79        self.assertIs(type(newdt), xmlrpclib.DateTime)
80        self.assertIsNone(m)
81
82
83    def test_datetime_before_1900(self):
84        # same as before but with a date before 1900
85        dt = datetime.datetime(1,  2, 10, 11, 41, 23)
86        self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
87        s = xmlrpclib.dumps((dt,))
88
89        result, m = xmlrpclib.loads(s, use_builtin_types=True)
90        (newdt,) = result
91        self.assertEqual(newdt, dt)
92        self.assertIs(type(newdt), datetime.datetime)
93        self.assertIsNone(m)
94
95        result, m = xmlrpclib.loads(s, use_builtin_types=False)
96        (newdt,) = result
97        self.assertEqual(newdt, dt)
98        self.assertIs(type(newdt), xmlrpclib.DateTime)
99        self.assertIsNone(m)
100
101    def test_bug_1164912 (self):
102        d = xmlrpclib.DateTime()
103        ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),
104                                            methodresponse=True))
105        self.assertIsInstance(new_d.value, str)
106
107        # Check that the output of dumps() is still an 8-bit string
108        s = xmlrpclib.dumps((new_d,), methodresponse=True)
109        self.assertIsInstance(s, str)
110
111    def test_newstyle_class(self):
112        class T(object):
113            pass
114        t = T()
115        t.x = 100
116        t.y = "Hello"
117        ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
118        self.assertEqual(t2, t.__dict__)
119
120    def test_dump_big_long(self):
121        self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
122
123    def test_dump_bad_dict(self):
124        self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
125
126    def test_dump_recursive_seq(self):
127        l = [1,2,3]
128        t = [3,4,5,l]
129        l.append(t)
130        self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
131
132    def test_dump_recursive_dict(self):
133        d = {'1':1, '2':1}
134        t = {'3':3, 'd':d}
135        d['t'] = t
136        self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
137
138    def test_dump_big_int(self):
139        if sys.maxsize > 2**31-1:
140            self.assertRaises(OverflowError, xmlrpclib.dumps,
141                              (int(2**34),))
142
143        xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
144        self.assertRaises(OverflowError, xmlrpclib.dumps,
145                          (xmlrpclib.MAXINT+1,))
146        self.assertRaises(OverflowError, xmlrpclib.dumps,
147                          (xmlrpclib.MININT-1,))
148
149        def dummy_write(s):
150            pass
151
152        m = xmlrpclib.Marshaller()
153        m.dump_int(xmlrpclib.MAXINT, dummy_write)
154        m.dump_int(xmlrpclib.MININT, dummy_write)
155        self.assertRaises(OverflowError, m.dump_int,
156                          xmlrpclib.MAXINT+1, dummy_write)
157        self.assertRaises(OverflowError, m.dump_int,
158                          xmlrpclib.MININT-1, dummy_write)
159
160    def test_dump_double(self):
161        xmlrpclib.dumps((float(2 ** 34),))
162        xmlrpclib.dumps((float(xmlrpclib.MAXINT),
163                         float(xmlrpclib.MININT)))
164        xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42),
165                         float(xmlrpclib.MININT - 42)))
166
167        def dummy_write(s):
168            pass
169
170        m = xmlrpclib.Marshaller()
171        m.dump_double(xmlrpclib.MAXINT, dummy_write)
172        m.dump_double(xmlrpclib.MININT, dummy_write)
173        m.dump_double(xmlrpclib.MAXINT + 42, dummy_write)
174        m.dump_double(xmlrpclib.MININT - 42, dummy_write)
175
176    def test_dump_none(self):
177        value = alist + [None]
178        arg1 = (alist + [None],)
179        strg = xmlrpclib.dumps(arg1, allow_none=True)
180        self.assertEqual(value,
181                          xmlrpclib.loads(strg)[0][0])
182        self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
183
184    def test_dump_encoding(self):
185        value = {'key\u20ac\xa4':
186                 'value\u20ac\xa4'}
187        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15')
188        strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg
189        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
190        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
191        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
192
193        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
194                               methodresponse=True)
195        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
196        strg = strg.encode('iso-8859-15', 'xmlcharrefreplace')
197        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
198
199        methodname = 'method\u20ac\xa4'
200        strg = xmlrpclib.dumps((value,), encoding='iso-8859-15',
201                               methodname=methodname)
202        self.assertEqual(xmlrpclib.loads(strg)[0][0], value)
203        self.assertEqual(xmlrpclib.loads(strg)[1], methodname)
204
205    def test_dump_bytes(self):
206        sample = b"my dog has fleas"
207        self.assertEqual(sample, xmlrpclib.Binary(sample))
208        for type_ in bytes, bytearray, xmlrpclib.Binary:
209            value = type_(sample)
210            s = xmlrpclib.dumps((value,))
211
212            result, m = xmlrpclib.loads(s, use_builtin_types=True)
213            (newvalue,) = result
214            self.assertEqual(newvalue, sample)
215            self.assertIs(type(newvalue), bytes)
216            self.assertIsNone(m)
217
218            result, m = xmlrpclib.loads(s, use_builtin_types=False)
219            (newvalue,) = result
220            self.assertEqual(newvalue, sample)
221            self.assertIs(type(newvalue), xmlrpclib.Binary)
222            self.assertIsNone(m)
223
224    def test_loads_unsupported(self):
225        ResponseError = xmlrpclib.ResponseError
226        data = '<params><param><value><spam/></value></param></params>'
227        self.assertRaises(ResponseError, xmlrpclib.loads, data)
228        data = ('<params><param><value><array>'
229                '<value><spam/></value>'
230                '</array></value></param></params>')
231        self.assertRaises(ResponseError, xmlrpclib.loads, data)
232        data = ('<params><param><value><struct>'
233                '<member><name>a</name><value><spam/></value></member>'
234                '<member><name>b</name><value><spam/></value></member>'
235                '</struct></value></param></params>')
236        self.assertRaises(ResponseError, xmlrpclib.loads, data)
237
238    def check_loads(self, s, value, **kwargs):
239        dump = '<params><param><value>%s</value></param></params>' % s
240        result, m = xmlrpclib.loads(dump, **kwargs)
241        (newvalue,) = result
242        self.assertEqual(newvalue, value)
243        self.assertIs(type(newvalue), type(value))
244        self.assertIsNone(m)
245
246    def test_load_standard_types(self):
247        check = self.check_loads
248        check('string', 'string')
249        check('<string>string</string>', 'string')
250        check('<string>�������������� string</string>', '�������������� string')
251        check('<int>2056183947</int>', 2056183947)
252        check('<int>-2056183947</int>', -2056183947)
253        check('<i4>2056183947</i4>', 2056183947)
254        check('<double>46093.78125</double>', 46093.78125)
255        check('<boolean>0</boolean>', False)
256        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
257              xmlrpclib.Binary(b'\x00byte string\xff'))
258        check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>',
259              b'\x00byte string\xff', use_builtin_types=True)
260        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
261              xmlrpclib.DateTime('20050210T11:41:23'))
262        check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>',
263              datetime.datetime(2005, 2, 10, 11, 41, 23),
264              use_builtin_types=True)
265        check('<array><data>'
266              '<value><int>1</int></value><value><int>2</int></value>'
267              '</data></array>', [1, 2])
268        check('<struct>'
269              '<member><name>b</name><value><int>2</int></value></member>'
270              '<member><name>a</name><value><int>1</int></value></member>'
271              '</struct>', {'a': 1, 'b': 2})
272
273    def test_load_extension_types(self):
274        check = self.check_loads
275        check('<nil/>', None)
276        check('<ex:nil/>', None)
277        check('<i1>205</i1>', 205)
278        check('<i2>20561</i2>', 20561)
279        check('<i8>9876543210</i8>', 9876543210)
280        check('<biginteger>98765432100123456789</biginteger>',
281              98765432100123456789)
282        check('<float>93.78125</float>', 93.78125)
283        check('<bigdecimal>9876543210.0123456789</bigdecimal>',
284              decimal.Decimal('9876543210.0123456789'))
285
286    def test_get_host_info(self):
287        # see bug #3613, this raised a TypeError
288        transp = xmlrpc.client.Transport()
289        self.assertEqual(transp.get_host_info("user@host.tld"),
290                          ('host.tld',
291                           [('Authorization', 'Basic dXNlcg==')], {}))
292
293    def test_ssl_presence(self):
294        try:
295            import ssl
296        except ImportError:
297            has_ssl = False
298        else:
299            has_ssl = True
300        try:
301            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
302        except NotImplementedError:
303            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
304        except OSError:
305            self.assertTrue(has_ssl)
306
307    def test_keepalive_disconnect(self):
308        class RequestHandler(http.server.BaseHTTPRequestHandler):
309            protocol_version = "HTTP/1.1"
310            handled = False
311
312            def do_POST(self):
313                length = int(self.headers.get("Content-Length"))
314                self.rfile.read(length)
315                if self.handled:
316                    self.close_connection = True
317                    return
318                response = xmlrpclib.dumps((5,), methodresponse=True)
319                response = response.encode()
320                self.send_response(http.HTTPStatus.OK)
321                self.send_header("Content-Length", len(response))
322                self.end_headers()
323                self.wfile.write(response)
324                self.handled = True
325                self.close_connection = False
326
327            def log_message(self, format, *args):
328                # don't clobber sys.stderr
329                pass
330
331        def run_server():
332            server.socket.settimeout(float(1))  # Don't hang if client fails
333            server.handle_request()  # First request and attempt at second
334            server.handle_request()  # Retried second request
335
336        server = http.server.HTTPServer((support.HOST, 0), RequestHandler)
337        self.addCleanup(server.server_close)
338        thread = threading.Thread(target=run_server)
339        thread.start()
340        self.addCleanup(thread.join)
341        url = "http://{}:{}/".format(*server.server_address)
342        with xmlrpclib.ServerProxy(url) as p:
343            self.assertEqual(p.method(), 5)
344            self.assertEqual(p.method(), 5)
345
346
347class SimpleXMLRPCDispatcherTestCase(unittest.TestCase):
348    class DispatchExc(Exception):
349        """Raised inside the dispatched functions when checking for
350        chained exceptions"""
351
352    def test_call_registered_func(self):
353        """Calls explicitly registered function"""
354        # Makes sure any exception raised inside the function has no other
355        # exception chained to it
356
357        exp_params = 1, 2, 3
358
359        def dispatched_func(*params):
360            raise self.DispatchExc(params)
361
362        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
363        dispatcher.register_function(dispatched_func)
364        with self.assertRaises(self.DispatchExc) as exc_ctx:
365            dispatcher._dispatch('dispatched_func', exp_params)
366        self.assertEqual(exc_ctx.exception.args, (exp_params,))
367        self.assertIsNone(exc_ctx.exception.__cause__)
368        self.assertIsNone(exc_ctx.exception.__context__)
369
370    def test_call_instance_func(self):
371        """Calls a registered instance attribute as a function"""
372        # Makes sure any exception raised inside the function has no other
373        # exception chained to it
374
375        exp_params = 1, 2, 3
376
377        class DispatchedClass:
378            def dispatched_func(self, *params):
379                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(params)
380
381        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
382        dispatcher.register_instance(DispatchedClass())
383        with self.assertRaises(self.DispatchExc) as exc_ctx:
384            dispatcher._dispatch('dispatched_func', exp_params)
385        self.assertEqual(exc_ctx.exception.args, (exp_params,))
386        self.assertIsNone(exc_ctx.exception.__cause__)
387        self.assertIsNone(exc_ctx.exception.__context__)
388
389    def test_call_dispatch_func(self):
390        """Calls the registered instance's `_dispatch` function"""
391        # Makes sure any exception raised inside the function has no other
392        # exception chained to it
393
394        exp_method = 'method'
395        exp_params = 1, 2, 3
396
397        class TestInstance:
398            def _dispatch(self, method, params):
399                raise SimpleXMLRPCDispatcherTestCase.DispatchExc(
400                    method, params)
401
402        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
403        dispatcher.register_instance(TestInstance())
404        with self.assertRaises(self.DispatchExc) as exc_ctx:
405            dispatcher._dispatch(exp_method, exp_params)
406        self.assertEqual(exc_ctx.exception.args, (exp_method, exp_params))
407        self.assertIsNone(exc_ctx.exception.__cause__)
408        self.assertIsNone(exc_ctx.exception.__context__)
409
410    def test_registered_func_is_none(self):
411        """Calls explicitly registered function which is None"""
412
413        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
414        dispatcher.register_function(None, name='method')
415        with self.assertRaisesRegex(Exception, 'method'):
416            dispatcher._dispatch('method', ('param',))
417
418    def test_instance_has_no_func(self):
419        """Attempts to call nonexistent function on a registered instance"""
420
421        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
422        dispatcher.register_instance(object())
423        with self.assertRaisesRegex(Exception, 'method'):
424            dispatcher._dispatch('method', ('param',))
425
426    def test_cannot_locate_func(self):
427        """Calls a function that the dispatcher cannot locate"""
428
429        dispatcher = xmlrpc.server.SimpleXMLRPCDispatcher()
430        with self.assertRaisesRegex(Exception, 'method'):
431            dispatcher._dispatch('method', ('param',))
432
433
434class HelperTestCase(unittest.TestCase):
435    def test_escape(self):
436        self.assertEqual(xmlrpclib.escape("a&b"), "a&amp;b")
437        self.assertEqual(xmlrpclib.escape("a<b"), "a&lt;b")
438        self.assertEqual(xmlrpclib.escape("a>b"), "a&gt;b")
439
440class FaultTestCase(unittest.TestCase):
441    def test_repr(self):
442        f = xmlrpclib.Fault(42, 'Test Fault')
443        self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>")
444        self.assertEqual(repr(f), str(f))
445
446    def test_dump_fault(self):
447        f = xmlrpclib.Fault(42, 'Test Fault')
448        s = xmlrpclib.dumps((f,))
449        (newf,), m = xmlrpclib.loads(s)
450        self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
451        self.assertEqual(m, None)
452
453        s = xmlrpclib.Marshaller().dumps(f)
454        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
455
456    def test_dotted_attribute(self):
457        # this will raise AttributeError because code don't want us to use
458        # private methods
459        self.assertRaises(AttributeError,
460                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
461        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
462
463class DateTimeTestCase(unittest.TestCase):
464    def test_default(self):
465        with mock.patch('time.localtime') as localtime_mock:
466            time_struct = time.struct_time(
467                [2013, 7, 15, 0, 24, 49, 0, 196, 0])
468            localtime_mock.return_value = time_struct
469            localtime = time.localtime()
470            t = xmlrpclib.DateTime()
471            self.assertEqual(str(t),
472                             time.strftime("%Y%m%dT%H:%M:%S", localtime))
473
474    def test_time(self):
475        d = 1181399930.036952
476        t = xmlrpclib.DateTime(d)
477        self.assertEqual(str(t),
478                         time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d)))
479
480    def test_time_tuple(self):
481        d = (2007,6,9,10,38,50,5,160,0)
482        t = xmlrpclib.DateTime(d)
483        self.assertEqual(str(t), '20070609T10:38:50')
484
485    def test_time_struct(self):
486        d = time.localtime(1181399930.036952)
487        t = xmlrpclib.DateTime(d)
488        self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d))
489
490    def test_datetime_datetime(self):
491        d = datetime.datetime(2007,1,2,3,4,5)
492        t = xmlrpclib.DateTime(d)
493        self.assertEqual(str(t), '20070102T03:04:05')
494
495    def test_repr(self):
496        d = datetime.datetime(2007,1,2,3,4,5)
497        t = xmlrpclib.DateTime(d)
498        val ="<DateTime '20070102T03:04:05' at %#x>" % id(t)
499        self.assertEqual(repr(t), val)
500
501    def test_decode(self):
502        d = ' 20070908T07:11:13  '
503        t1 = xmlrpclib.DateTime()
504        t1.decode(d)
505        tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13))
506        self.assertEqual(t1, tref)
507
508        t2 = xmlrpclib._datetime(d)
509        self.assertEqual(t2, tref)
510
511    def test_comparison(self):
512        now = datetime.datetime.now()
513        dtime = xmlrpclib.DateTime(now.timetuple())
514
515        # datetime vs. DateTime
516        self.assertTrue(dtime == now)
517        self.assertTrue(now == dtime)
518        then = now + datetime.timedelta(seconds=4)
519        self.assertTrue(then >= dtime)
520        self.assertTrue(dtime < then)
521
522        # str vs. DateTime
523        dstr = now.strftime("%Y%m%dT%H:%M:%S")
524        self.assertTrue(dtime == dstr)
525        self.assertTrue(dstr == dtime)
526        dtime_then = xmlrpclib.DateTime(then.timetuple())
527        self.assertTrue(dtime_then >= dstr)
528        self.assertTrue(dstr < dtime_then)
529
530        # some other types
531        dbytes = dstr.encode('ascii')
532        dtuple = now.timetuple()
533        with self.assertRaises(TypeError):
534            dtime == 1970
535        with self.assertRaises(TypeError):
536            dtime != dbytes
537        with self.assertRaises(TypeError):
538            dtime == bytearray(dbytes)
539        with self.assertRaises(TypeError):
540            dtime != dtuple
541        with self.assertRaises(TypeError):
542            dtime < float(1970)
543        with self.assertRaises(TypeError):
544            dtime > dbytes
545        with self.assertRaises(TypeError):
546            dtime <= bytearray(dbytes)
547        with self.assertRaises(TypeError):
548            dtime >= dtuple
549
550class BinaryTestCase(unittest.TestCase):
551
552    # XXX What should str(Binary(b"\xff")) return?  I'm chosing "\xff"
553    # for now (i.e. interpreting the binary data as Latin-1-encoded
554    # text).  But this feels very unsatisfactory.  Perhaps we should
555    # only define repr(), and return r"Binary(b'\xff')" instead?
556
557    def test_default(self):
558        t = xmlrpclib.Binary()
559        self.assertEqual(str(t), '')
560
561    def test_string(self):
562        d = b'\x01\x02\x03abc123\xff\xfe'
563        t = xmlrpclib.Binary(d)
564        self.assertEqual(str(t), str(d, "latin-1"))
565
566    def test_decode(self):
567        d = b'\x01\x02\x03abc123\xff\xfe'
568        de = base64.encodebytes(d)
569        t1 = xmlrpclib.Binary()
570        t1.decode(de)
571        self.assertEqual(str(t1), str(d, "latin-1"))
572
573        t2 = xmlrpclib._binary(de)
574        self.assertEqual(str(t2), str(d, "latin-1"))
575
576
577ADDR = PORT = URL = None
578
579# The evt is set twice.  First when the server is ready to serve.
580# Second when the server has been shutdown.  The user must clear
581# the event after it has been set the first time to catch the second set.
582def http_server(evt, numrequests, requestHandler=None, encoding=None):
583    class TestInstanceClass:
584        def div(self, x, y):
585            return x // y
586
587        def _methodHelp(self, name):
588            if name == 'div':
589                return 'This is the div function'
590
591        class Fixture:
592            @staticmethod
593            def getData():
594                return '42'
595
596    class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
597        def get_request(self):
598            # Ensure the socket is always non-blocking.  On Linux, socket
599            # attributes are not inherited like they are on *BSD and Windows.
600            s, port = self.socket.accept()
601            s.setblocking(True)
602            return s, port
603
604    if not requestHandler:
605        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
606    serv = MyXMLRPCServer(("localhost", 0), requestHandler,
607                          encoding=encoding,
608                          logRequests=False, bind_and_activate=False)
609    try:
610        serv.server_bind()
611        global ADDR, PORT, URL
612        ADDR, PORT = serv.socket.getsockname()
613        #connect to IP address directly.  This avoids socket.create_connection()
614        #trying to connect to "localhost" using all address families, which
615        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
616        #on AF_INET only.
617        URL = "http://%s:%d"%(ADDR, PORT)
618        serv.server_activate()
619        serv.register_introspection_functions()
620        serv.register_multicall_functions()
621        serv.register_function(pow)
622        serv.register_function(lambda x: x, 'têšt')
623        @serv.register_function
624        def my_function():
625            '''This is my function'''
626            return True
627        @serv.register_function(name='add')
628        def _(x, y):
629            return x + y
630        testInstance = TestInstanceClass()
631        serv.register_instance(testInstance, allow_dotted_names=True)
632        evt.set()
633
634        # handle up to 'numrequests' requests
635        while numrequests > 0:
636            serv.handle_request()
637            numrequests -= 1
638
639    except socket.timeout:
640        pass
641    finally:
642        serv.socket.close()
643        PORT = None
644        evt.set()
645
646def http_multi_server(evt, numrequests, requestHandler=None):
647    class TestInstanceClass:
648        def div(self, x, y):
649            return x // y
650
651        def _methodHelp(self, name):
652            if name == 'div':
653                return 'This is the div function'
654
655    def my_function():
656        '''This is my function'''
657        return True
658
659    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer):
660        def get_request(self):
661            # Ensure the socket is always non-blocking.  On Linux, socket
662            # attributes are not inherited like they are on *BSD and Windows.
663            s, port = self.socket.accept()
664            s.setblocking(True)
665            return s, port
666
667    if not requestHandler:
668        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler
669    class MyRequestHandler(requestHandler):
670        rpc_paths = []
671
672    class BrokenDispatcher:
673        def _marshaled_dispatch(self, data, dispatch_method=None, path=None):
674            raise RuntimeError("broken dispatcher")
675
676    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler,
677                          logRequests=False, bind_and_activate=False)
678    serv.socket.settimeout(3)
679    serv.server_bind()
680    try:
681        global ADDR, PORT, URL
682        ADDR, PORT = serv.socket.getsockname()
683        #connect to IP address directly.  This avoids socket.create_connection()
684        #trying to connect to "localhost" using all address families, which
685        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens
686        #on AF_INET only.
687        URL = "http://%s:%d"%(ADDR, PORT)
688        serv.server_activate()
689        paths = ["/foo", "/foo/bar"]
690        for path in paths:
691            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher())
692            d.register_introspection_functions()
693            d.register_multicall_functions()
694        serv.get_dispatcher(paths[0]).register_function(pow)
695        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add')
696        serv.add_dispatcher("/is/broken", BrokenDispatcher())
697        evt.set()
698
699        # handle up to 'numrequests' requests
700        while numrequests > 0:
701            serv.handle_request()
702            numrequests -= 1
703
704    except socket.timeout:
705        pass
706    finally:
707        serv.socket.close()
708        PORT = None
709        evt.set()
710
711# This function prevents errors like:
712#    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>
713def is_unavailable_exception(e):
714    '''Returns True if the given ProtocolError is the product of a server-side
715       exception caused by the 'temporarily unavailable' response sometimes
716       given by operations on non-blocking sockets.'''
717
718    # sometimes we get a -1 error code and/or empty headers
719    try:
720        if e.errcode == -1 or e.headers is None:
721            return True
722        exc_mess = e.headers.get('X-exception')
723    except AttributeError:
724        # Ignore OSErrors here.
725        exc_mess = str(e)
726
727    if exc_mess and 'temporarily unavailable' in exc_mess.lower():
728        return True
729
730def make_request_and_skipIf(condition, reason):
731    # If we skip the test, we have to make a request because
732    # the server created in setUp blocks expecting one to come in.
733    if not condition:
734        return lambda func: func
735    def decorator(func):
736        def make_request_and_skip(self):
737            try:
738                xmlrpclib.ServerProxy(URL).my_function()
739            except (xmlrpclib.ProtocolError, OSError) as e:
740                if not is_unavailable_exception(e):
741                    raise
742            raise unittest.SkipTest(reason)
743        return make_request_and_skip
744    return decorator
745
746class BaseServerTestCase(unittest.TestCase):
747    requestHandler = None
748    request_count = 1
749    threadFunc = staticmethod(http_server)
750
751    def setUp(self):
752        # enable traceback reporting
753        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
754
755        self.evt = threading.Event()
756        # start server thread to handle requests
757        serv_args = (self.evt, self.request_count, self.requestHandler)
758        thread = threading.Thread(target=self.threadFunc, args=serv_args)
759        thread.start()
760        self.addCleanup(thread.join)
761
762        # wait for the server to be ready
763        self.evt.wait()
764        self.evt.clear()
765
766    def tearDown(self):
767        # wait on the server thread to terminate
768        self.evt.wait()
769
770        # disable traceback reporting
771        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
772
773class SimpleServerTestCase(BaseServerTestCase):
774    def test_simple1(self):
775        try:
776            p = xmlrpclib.ServerProxy(URL)
777            self.assertEqual(p.pow(6,8), 6**8)
778        except (xmlrpclib.ProtocolError, OSError) as e:
779            # ignore failures due to non-blocking socket 'unavailable' errors
780            if not is_unavailable_exception(e):
781                # protocol error; provide additional information in test output
782                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
783
784    def test_nonascii(self):
785        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
786        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
787        try:
788            p = xmlrpclib.ServerProxy(URL)
789            self.assertEqual(p.add(start_string, end_string),
790                             start_string + end_string)
791        except (xmlrpclib.ProtocolError, OSError) as e:
792            # ignore failures due to non-blocking socket 'unavailable' errors
793            if not is_unavailable_exception(e):
794                # protocol error; provide additional information in test output
795                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
796
797    def test_client_encoding(self):
798        start_string = '\u20ac'
799        end_string = '\xa4'
800
801        try:
802            p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15')
803            self.assertEqual(p.add(start_string, end_string),
804                             start_string + end_string)
805        except (xmlrpclib.ProtocolError, socket.error) as e:
806            # ignore failures due to non-blocking socket unavailable errors.
807            if not is_unavailable_exception(e):
808                # protocol error; provide additional information in test output
809                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
810
811    def test_nonascii_methodname(self):
812        try:
813            p = xmlrpclib.ServerProxy(URL, encoding='ascii')
814            self.assertEqual(p.têšt(42), 42)
815        except (xmlrpclib.ProtocolError, socket.error) as e:
816            # ignore failures due to non-blocking socket unavailable errors.
817            if not is_unavailable_exception(e):
818                # protocol error; provide additional information in test output
819                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
820
821    def test_404(self):
822        # send POST with http.client, it should return 404 header and
823        # 'Not Found' message.
824        with contextlib.closing(http.client.HTTPConnection(ADDR, PORT)) as conn:
825            conn.request('POST', '/this-is-not-valid')
826            response = conn.getresponse()
827
828        self.assertEqual(response.status, 404)
829        self.assertEqual(response.reason, 'Not Found')
830
831    def test_introspection1(self):
832        expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt',
833                                'system.listMethods', 'system.methodHelp',
834                                'system.methodSignature', 'system.multicall',
835                                'Fixture'])
836        try:
837            p = xmlrpclib.ServerProxy(URL)
838            meth = p.system.listMethods()
839            self.assertEqual(set(meth), expected_methods)
840        except (xmlrpclib.ProtocolError, OSError) as e:
841            # ignore failures due to non-blocking socket 'unavailable' errors
842            if not is_unavailable_exception(e):
843                # protocol error; provide additional information in test output
844                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
845
846
847    def test_introspection2(self):
848        try:
849            # test _methodHelp()
850            p = xmlrpclib.ServerProxy(URL)
851            divhelp = p.system.methodHelp('div')
852            self.assertEqual(divhelp, 'This is the div function')
853        except (xmlrpclib.ProtocolError, OSError) as e:
854            # ignore failures due to non-blocking socket 'unavailable' errors
855            if not is_unavailable_exception(e):
856                # protocol error; provide additional information in test output
857                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
858
859    @make_request_and_skipIf(sys.flags.optimize >= 2,
860                     "Docstrings are omitted with -O2 and above")
861    def test_introspection3(self):
862        try:
863            # test native doc
864            p = xmlrpclib.ServerProxy(URL)
865            myfunction = p.system.methodHelp('my_function')
866            self.assertEqual(myfunction, 'This is my function')
867        except (xmlrpclib.ProtocolError, OSError) as e:
868            # ignore failures due to non-blocking socket 'unavailable' errors
869            if not is_unavailable_exception(e):
870                # protocol error; provide additional information in test output
871                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
872
873    def test_introspection4(self):
874        # the SimpleXMLRPCServer doesn't support signatures, but
875        # at least check that we can try making the call
876        try:
877            p = xmlrpclib.ServerProxy(URL)
878            divsig = p.system.methodSignature('div')
879            self.assertEqual(divsig, 'signatures not supported')
880        except (xmlrpclib.ProtocolError, OSError) as e:
881            # ignore failures due to non-blocking socket 'unavailable' errors
882            if not is_unavailable_exception(e):
883                # protocol error; provide additional information in test output
884                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
885
886    def test_multicall(self):
887        try:
888            p = xmlrpclib.ServerProxy(URL)
889            multicall = xmlrpclib.MultiCall(p)
890            multicall.add(2,3)
891            multicall.pow(6,8)
892            multicall.div(127,42)
893            add_result, pow_result, div_result = multicall()
894            self.assertEqual(add_result, 2+3)
895            self.assertEqual(pow_result, 6**8)
896            self.assertEqual(div_result, 127//42)
897        except (xmlrpclib.ProtocolError, OSError) as e:
898            # ignore failures due to non-blocking socket 'unavailable' errors
899            if not is_unavailable_exception(e):
900                # protocol error; provide additional information in test output
901                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
902
903    def test_non_existing_multicall(self):
904        try:
905            p = xmlrpclib.ServerProxy(URL)
906            multicall = xmlrpclib.MultiCall(p)
907            multicall.this_is_not_exists()
908            result = multicall()
909
910            # result.results contains;
911            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
912            #   'method "this_is_not_exists" is not supported'>}]
913
914            self.assertEqual(result.results[0]['faultCode'], 1)
915            self.assertEqual(result.results[0]['faultString'],
916                '<class \'Exception\'>:method "this_is_not_exists" '
917                'is not supported')
918        except (xmlrpclib.ProtocolError, OSError) as e:
919            # ignore failures due to non-blocking socket 'unavailable' errors
920            if not is_unavailable_exception(e):
921                # protocol error; provide additional information in test output
922                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
923
924    def test_dotted_attribute(self):
925        # Raises an AttributeError because private methods are not allowed.
926        self.assertRaises(AttributeError,
927                          xmlrpc.server.resolve_dotted_attribute, str, '__add')
928
929        self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
930        # Get the test to run faster by sending a request with test_simple1.
931        # This avoids waiting for the socket timeout.
932        self.test_simple1()
933
934    def test_allow_dotted_names_true(self):
935        # XXX also need allow_dotted_names_false test.
936        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
937        data = server.Fixture.getData()
938        self.assertEqual(data, '42')
939
940    def test_unicode_host(self):
941        server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT))
942        self.assertEqual(server.add("a", "\xe9"), "a\xe9")
943
944    def test_partial_post(self):
945        # Check that a partial POST doesn't make the server loop: issue #14001.
946        with contextlib.closing(socket.create_connection((ADDR, PORT))) as conn:
947            conn.send('POST /RPC2 HTTP/1.0\r\n'
948                      'Content-Length: 100\r\n\r\n'
949                      'bye HTTP/1.1\r\n'
950                      f'Host: {ADDR}:{PORT}\r\n'
951                      'Accept-Encoding: identity\r\n'
952                      'Content-Length: 0\r\n\r\n'.encode('ascii'))
953
954    def test_context_manager(self):
955        with xmlrpclib.ServerProxy(URL) as server:
956            server.add(2, 3)
957            self.assertNotEqual(server('transport')._connection,
958                                (None, None))
959        self.assertEqual(server('transport')._connection,
960                         (None, None))
961
962    def test_context_manager_method_error(self):
963        try:
964            with xmlrpclib.ServerProxy(URL) as server:
965                server.add(2, "a")
966        except xmlrpclib.Fault:
967            pass
968        self.assertEqual(server('transport')._connection,
969                         (None, None))
970
971
972class SimpleServerEncodingTestCase(BaseServerTestCase):
973    @staticmethod
974    def threadFunc(evt, numrequests, requestHandler=None, encoding=None):
975        http_server(evt, numrequests, requestHandler, 'iso-8859-15')
976
977    def test_server_encoding(self):
978        start_string = '\u20ac'
979        end_string = '\xa4'
980
981        try:
982            p = xmlrpclib.ServerProxy(URL)
983            self.assertEqual(p.add(start_string, end_string),
984                             start_string + end_string)
985        except (xmlrpclib.ProtocolError, socket.error) as e:
986            # ignore failures due to non-blocking socket unavailable errors.
987            if not is_unavailable_exception(e):
988                # protocol error; provide additional information in test output
989                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
990
991
992class MultiPathServerTestCase(BaseServerTestCase):
993    threadFunc = staticmethod(http_multi_server)
994    request_count = 2
995    def test_path1(self):
996        p = xmlrpclib.ServerProxy(URL+"/foo")
997        self.assertEqual(p.pow(6,8), 6**8)
998        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
999
1000    def test_path2(self):
1001        p = xmlrpclib.ServerProxy(URL+"/foo/bar")
1002        self.assertEqual(p.add(6,8), 6+8)
1003        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8)
1004
1005    def test_path3(self):
1006        p = xmlrpclib.ServerProxy(URL+"/is/broken")
1007        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8)
1008
1009#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1010#does indeed serve subsequent requests on the same connection
1011class BaseKeepaliveServerTestCase(BaseServerTestCase):
1012    #a request handler that supports keep-alive and logs requests into a
1013    #class variable
1014    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1015        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1016        protocol_version = 'HTTP/1.1'
1017        myRequests = []
1018        def handle(self):
1019            self.myRequests.append([])
1020            self.reqidx = len(self.myRequests)-1
1021            return self.parentClass.handle(self)
1022        def handle_one_request(self):
1023            result = self.parentClass.handle_one_request(self)
1024            self.myRequests[self.reqidx].append(self.raw_requestline)
1025            return result
1026
1027    requestHandler = RequestHandler
1028    def setUp(self):
1029        #clear request log
1030        self.RequestHandler.myRequests = []
1031        return BaseServerTestCase.setUp(self)
1032
1033#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism
1034#does indeed serve subsequent requests on the same connection
1035class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase):
1036    def test_two(self):
1037        p = xmlrpclib.ServerProxy(URL)
1038        #do three requests.
1039        self.assertEqual(p.pow(6,8), 6**8)
1040        self.assertEqual(p.pow(6,8), 6**8)
1041        self.assertEqual(p.pow(6,8), 6**8)
1042        p("close")()
1043
1044        #they should have all been handled by a single request handler
1045        self.assertEqual(len(self.RequestHandler.myRequests), 1)
1046
1047        #check that we did at least two (the third may be pending append
1048        #due to thread scheduling)
1049        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1050
1051
1052#test special attribute access on the serverproxy, through the __call__
1053#function.
1054class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase):
1055    #ask for two keepalive requests to be handled.
1056    request_count=2
1057
1058    def test_close(self):
1059        p = xmlrpclib.ServerProxy(URL)
1060        #do some requests with close.
1061        self.assertEqual(p.pow(6,8), 6**8)
1062        self.assertEqual(p.pow(6,8), 6**8)
1063        self.assertEqual(p.pow(6,8), 6**8)
1064        p("close")() #this should trigger a new keep-alive request
1065        self.assertEqual(p.pow(6,8), 6**8)
1066        self.assertEqual(p.pow(6,8), 6**8)
1067        self.assertEqual(p.pow(6,8), 6**8)
1068        p("close")()
1069
1070        #they should have all been two request handlers, each having logged at least
1071        #two complete requests
1072        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1073        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
1074        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
1075
1076
1077    def test_transport(self):
1078        p = xmlrpclib.ServerProxy(URL)
1079        #do some requests with close.
1080        self.assertEqual(p.pow(6,8), 6**8)
1081        p("transport").close() #same as above, really.
1082        self.assertEqual(p.pow(6,8), 6**8)
1083        p("close")()
1084        self.assertEqual(len(self.RequestHandler.myRequests), 2)
1085
1086#A test case that verifies that gzip encoding works in both directions
1087#(for a request and the response)
1088@unittest.skipIf(gzip is None, 'requires gzip')
1089class GzipServerTestCase(BaseServerTestCase):
1090    #a request handler that supports keep-alive and logs requests into a
1091    #class variable
1092    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1093        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler
1094        protocol_version = 'HTTP/1.1'
1095
1096        def do_POST(self):
1097            #store content of last request in class
1098            self.__class__.content_length = int(self.headers["content-length"])
1099            return self.parentClass.do_POST(self)
1100    requestHandler = RequestHandler
1101
1102    class Transport(xmlrpclib.Transport):
1103        #custom transport, stores the response length for our perusal
1104        fake_gzip = False
1105        def parse_response(self, response):
1106            self.response_length=int(response.getheader("content-length", 0))
1107            return xmlrpclib.Transport.parse_response(self, response)
1108
1109        def send_content(self, connection, body):
1110            if self.fake_gzip:
1111                #add a lone gzip header to induce decode error remotely
1112                connection.putheader("Content-Encoding", "gzip")
1113            return xmlrpclib.Transport.send_content(self, connection, body)
1114
1115    def setUp(self):
1116        BaseServerTestCase.setUp(self)
1117
1118    def test_gzip_request(self):
1119        t = self.Transport()
1120        t.encode_threshold = None
1121        p = xmlrpclib.ServerProxy(URL, transport=t)
1122        self.assertEqual(p.pow(6,8), 6**8)
1123        a = self.RequestHandler.content_length
1124        t.encode_threshold = 0 #turn on request encoding
1125        self.assertEqual(p.pow(6,8), 6**8)
1126        b = self.RequestHandler.content_length
1127        self.assertTrue(a>b)
1128        p("close")()
1129
1130    def test_bad_gzip_request(self):
1131        t = self.Transport()
1132        t.encode_threshold = None
1133        t.fake_gzip = True
1134        p = xmlrpclib.ServerProxy(URL, transport=t)
1135        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError,
1136                                    re.compile(r"\b400\b"))
1137        with cm:
1138            p.pow(6, 8)
1139        p("close")()
1140
1141    def test_gzip_response(self):
1142        t = self.Transport()
1143        p = xmlrpclib.ServerProxy(URL, transport=t)
1144        old = self.requestHandler.encode_threshold
1145        self.requestHandler.encode_threshold = None #no encoding
1146        self.assertEqual(p.pow(6,8), 6**8)
1147        a = t.response_length
1148        self.requestHandler.encode_threshold = 0 #always encode
1149        self.assertEqual(p.pow(6,8), 6**8)
1150        p("close")()
1151        b = t.response_length
1152        self.requestHandler.encode_threshold = old
1153        self.assertTrue(a>b)
1154
1155
1156@unittest.skipIf(gzip is None, 'requires gzip')
1157class GzipUtilTestCase(unittest.TestCase):
1158
1159    def test_gzip_decode_limit(self):
1160        max_gzip_decode = 20 * 1024 * 1024
1161        data = b'\0' * max_gzip_decode
1162        encoded = xmlrpclib.gzip_encode(data)
1163        decoded = xmlrpclib.gzip_decode(encoded)
1164        self.assertEqual(len(decoded), max_gzip_decode)
1165
1166        data = b'\0' * (max_gzip_decode + 1)
1167        encoded = xmlrpclib.gzip_encode(data)
1168
1169        with self.assertRaisesRegex(ValueError,
1170                                    "max gzipped payload length exceeded"):
1171            xmlrpclib.gzip_decode(encoded)
1172
1173        xmlrpclib.gzip_decode(encoded, max_decode=-1)
1174
1175
1176class HeadersServerTestCase(BaseServerTestCase):
1177    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler):
1178        test_headers = None
1179
1180        def do_POST(self):
1181            self.__class__.test_headers = self.headers
1182            return super().do_POST()
1183    requestHandler = RequestHandler
1184    standard_headers = [
1185        'Host', 'Accept-Encoding', 'Content-Type', 'User-Agent',
1186        'Content-Length']
1187
1188    def setUp(self):
1189        self.RequestHandler.test_headers = None
1190        return super().setUp()
1191
1192    def assertContainsAdditionalHeaders(self, headers, additional):
1193        expected_keys = sorted(self.standard_headers + list(additional.keys()))
1194        self.assertListEqual(sorted(headers.keys()), expected_keys)
1195
1196        for key, value in additional.items():
1197            self.assertEqual(headers.get(key), value)
1198
1199    def test_header(self):
1200        p = xmlrpclib.ServerProxy(URL, headers=[('X-Test', 'foo')])
1201        self.assertEqual(p.pow(6, 8), 6**8)
1202
1203        headers = self.RequestHandler.test_headers
1204        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1205
1206    def test_header_many(self):
1207        p = xmlrpclib.ServerProxy(
1208            URL, headers=[('X-Test', 'foo'), ('X-Test-Second', 'bar')])
1209        self.assertEqual(p.pow(6, 8), 6**8)
1210
1211        headers = self.RequestHandler.test_headers
1212        self.assertContainsAdditionalHeaders(
1213            headers, {'X-Test': 'foo', 'X-Test-Second': 'bar'})
1214
1215    def test_header_empty(self):
1216        p = xmlrpclib.ServerProxy(URL, headers=[])
1217        self.assertEqual(p.pow(6, 8), 6**8)
1218
1219        headers = self.RequestHandler.test_headers
1220        self.assertContainsAdditionalHeaders(headers, {})
1221
1222    def test_header_tuple(self):
1223        p = xmlrpclib.ServerProxy(URL, headers=(('X-Test', 'foo'),))
1224        self.assertEqual(p.pow(6, 8), 6**8)
1225
1226        headers = self.RequestHandler.test_headers
1227        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1228
1229    def test_header_items(self):
1230        p = xmlrpclib.ServerProxy(URL, headers={'X-Test': 'foo'}.items())
1231        self.assertEqual(p.pow(6, 8), 6**8)
1232
1233        headers = self.RequestHandler.test_headers
1234        self.assertContainsAdditionalHeaders(headers, {'X-Test': 'foo'})
1235
1236
1237#Test special attributes of the ServerProxy object
1238class ServerProxyTestCase(unittest.TestCase):
1239    def setUp(self):
1240        unittest.TestCase.setUp(self)
1241        # Actual value of the URL doesn't matter if it is a string in
1242        # the correct format.
1243        self.url = 'http://fake.localhost'
1244
1245    def test_close(self):
1246        p = xmlrpclib.ServerProxy(self.url)
1247        self.assertEqual(p('close')(), None)
1248
1249    def test_transport(self):
1250        t = xmlrpclib.Transport()
1251        p = xmlrpclib.ServerProxy(self.url, transport=t)
1252        self.assertEqual(p('transport'), t)
1253
1254
1255# This is a contrived way to make a failure occur on the server side
1256# in order to test the _send_traceback_header flag on the server
1257class FailingMessageClass(http.client.HTTPMessage):
1258    def get(self, key, failobj=None):
1259        key = key.lower()
1260        if key == 'content-length':
1261            return 'I am broken'
1262        return super().get(key, failobj)
1263
1264
1265class FailingServerTestCase(unittest.TestCase):
1266    def setUp(self):
1267        self.evt = threading.Event()
1268        # start server thread to handle requests
1269        serv_args = (self.evt, 1)
1270        thread = threading.Thread(target=http_server, args=serv_args)
1271        thread.start()
1272        self.addCleanup(thread.join)
1273
1274        # wait for the server to be ready
1275        self.evt.wait()
1276        self.evt.clear()
1277
1278    def tearDown(self):
1279        # wait on the server thread to terminate
1280        self.evt.wait()
1281        # reset flag
1282        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
1283        # reset message class
1284        default_class = http.client.HTTPMessage
1285        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class
1286
1287    def test_basic(self):
1288        # check that flag is false by default
1289        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
1290        self.assertEqual(flagval, False)
1291
1292        # enable traceback reporting
1293        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1294
1295        # test a call that shouldn't fail just as a smoke test
1296        try:
1297            p = xmlrpclib.ServerProxy(URL)
1298            self.assertEqual(p.pow(6,8), 6**8)
1299        except (xmlrpclib.ProtocolError, OSError) as e:
1300            # ignore failures due to non-blocking socket 'unavailable' errors
1301            if not is_unavailable_exception(e):
1302                # protocol error; provide additional information in test output
1303                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
1304
1305    def test_fail_no_info(self):
1306        # use the broken message class
1307        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1308
1309        try:
1310            p = xmlrpclib.ServerProxy(URL)
1311            p.pow(6,8)
1312        except (xmlrpclib.ProtocolError, OSError) as e:
1313            # ignore failures due to non-blocking socket 'unavailable' errors
1314            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1315                # The two server-side error headers shouldn't be sent back in this case
1316                self.assertTrue(e.headers.get("X-exception") is None)
1317                self.assertTrue(e.headers.get("X-traceback") is None)
1318        else:
1319            self.fail('ProtocolError not raised')
1320
1321    def test_fail_with_info(self):
1322        # use the broken message class
1323        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
1324
1325        # Check that errors in the server send back exception/traceback
1326        # info when flag is set
1327        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
1328
1329        try:
1330            p = xmlrpclib.ServerProxy(URL)
1331            p.pow(6,8)
1332        except (xmlrpclib.ProtocolError, OSError) as e:
1333            # ignore failures due to non-blocking socket 'unavailable' errors
1334            if not is_unavailable_exception(e) and hasattr(e, "headers"):
1335                # We should get error info in the response
1336                expected_err = "invalid literal for int() with base 10: 'I am broken'"
1337                self.assertEqual(e.headers.get("X-exception"), expected_err)
1338                self.assertTrue(e.headers.get("X-traceback") is not None)
1339        else:
1340            self.fail('ProtocolError not raised')
1341
1342
1343@contextlib.contextmanager
1344def captured_stdout(encoding='utf-8'):
1345    """A variation on support.captured_stdout() which gives a text stream
1346    having a `buffer` attribute.
1347    """
1348    orig_stdout = sys.stdout
1349    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding)
1350    try:
1351        yield sys.stdout
1352    finally:
1353        sys.stdout = orig_stdout
1354
1355
1356class CGIHandlerTestCase(unittest.TestCase):
1357    def setUp(self):
1358        self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
1359
1360    def tearDown(self):
1361        self.cgi = None
1362
1363    def test_cgi_get(self):
1364        with support.EnvironmentVarGuard() as env:
1365            env['REQUEST_METHOD'] = 'GET'
1366            # if the method is GET and no request_text is given, it runs handle_get
1367            # get sysout output
1368            with captured_stdout(encoding=self.cgi.encoding) as data_out:
1369                self.cgi.handle_request()
1370
1371            # parse Status header
1372            data_out.seek(0)
1373            handle = data_out.read()
1374            status = handle.split()[1]
1375            message = ' '.join(handle.split()[2:4])
1376
1377            self.assertEqual(status, '400')
1378            self.assertEqual(message, 'Bad Request')
1379
1380
1381    def test_cgi_xmlrpc_response(self):
1382        data = """<?xml version='1.0'?>
1383        <methodCall>
1384            <methodName>test_method</methodName>
1385            <params>
1386                <param>
1387                    <value><string>foo</string></value>
1388                </param>
1389                <param>
1390                    <value><string>bar</string></value>
1391                </param>
1392            </params>
1393        </methodCall>
1394        """
1395
1396        with support.EnvironmentVarGuard() as env, \
1397             captured_stdout(encoding=self.cgi.encoding) as data_out, \
1398             support.captured_stdin() as data_in:
1399            data_in.write(data)
1400            data_in.seek(0)
1401            env['CONTENT_LENGTH'] = str(len(data))
1402            self.cgi.handle_request()
1403        data_out.seek(0)
1404
1405        # will respond exception, if so, our goal is achieved ;)
1406        handle = data_out.read()
1407
1408        # start with 44th char so as not to get http header, we just
1409        # need only xml
1410        self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
1411
1412        # Also test the content-length returned  by handle_request
1413        # Using the same test method inorder to avoid all the datapassing
1414        # boilerplate code.
1415        # Test for bug: http://bugs.python.org/issue5040
1416
1417        content = handle[handle.find("<?xml"):]
1418
1419        self.assertEqual(
1420            int(re.search(r'Content-Length: (\d+)', handle).group(1)),
1421            len(content))
1422
1423
1424class UseBuiltinTypesTestCase(unittest.TestCase):
1425
1426    def test_use_builtin_types(self):
1427        # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
1428        # makes all dispatch of binary data as bytes instances, and all
1429        # dispatch of datetime argument as datetime.datetime instances.
1430        self.log = []
1431        expected_bytes = b"my dog has fleas"
1432        expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
1433        marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
1434        def foobar(*args):
1435            self.log.extend(args)
1436        handler = xmlrpc.server.SimpleXMLRPCDispatcher(
1437            allow_none=True, encoding=None, use_builtin_types=True)
1438        handler.register_function(foobar)
1439        handler._marshaled_dispatch(marshaled)
1440        self.assertEqual(len(self.log), 2)
1441        mybytes, mydate = self.log
1442        self.assertEqual(self.log, [expected_bytes, expected_date])
1443        self.assertIs(type(mydate), datetime.datetime)
1444        self.assertIs(type(mybytes), bytes)
1445
1446    def test_cgihandler_has_use_builtin_types_flag(self):
1447        handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True)
1448        self.assertTrue(handler.use_builtin_types)
1449
1450    def test_xmlrpcserver_has_use_builtin_types_flag(self):
1451        server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0),
1452            use_builtin_types=True)
1453        server.server_close()
1454        self.assertTrue(server.use_builtin_types)
1455
1456
1457@support.reap_threads
1458def test_main():
1459    support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase,
1460            BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase,
1461            SimpleServerTestCase, SimpleServerEncodingTestCase,
1462            KeepaliveServerTestCase1, KeepaliveServerTestCase2,
1463            GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase,
1464            MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase,
1465            CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase)
1466
1467
1468if __name__ == "__main__":
1469    test_main()
1470