1# defusedxml 2# 3# Copyright (c) 2013 by Christian Heimes <christian@python.org> 4# Licensed to PSF under a Contributor Agreement. 5# See https://www.python.org/psf/license for licensing details. 6"""Defused xmlrpclib 7 8Also defuses gzip bomb 9""" 10from __future__ import print_function, absolute_import 11 12import io 13 14from .common import DTDForbidden, EntitiesForbidden, ExternalReferenceForbidden, PY3 15 16if PY3: 17 __origin__ = "xmlrpc.client" 18 from xmlrpc.client import ExpatParser 19 from xmlrpc import client as xmlrpc_client 20 from xmlrpc import server as xmlrpc_server 21 from xmlrpc.client import gzip_decode as _orig_gzip_decode 22 from xmlrpc.client import GzipDecodedResponse as _OrigGzipDecodedResponse 23else: 24 __origin__ = "xmlrpclib" 25 from xmlrpclib import ExpatParser 26 import xmlrpclib as xmlrpc_client 27 28 xmlrpc_server = None 29 from xmlrpclib import gzip_decode as _orig_gzip_decode 30 from xmlrpclib import GzipDecodedResponse as _OrigGzipDecodedResponse 31 32try: 33 import gzip 34except ImportError: # pragma: no cover 35 gzip = None 36 37 38# Limit maximum request size to prevent resource exhaustion DoS 39# Also used to limit maximum amount of gzip decoded data in order to prevent 40# decompression bombs 41# A value of -1 or smaller disables the limit 42MAX_DATA = 30 * 1024 * 1024 # 30 MB 43 44 45def defused_gzip_decode(data, limit=None): 46 """gzip encoded data -> unencoded data 47 48 Decode data using the gzip content encoding as described in RFC 1952 49 """ 50 if not gzip: # pragma: no cover 51 raise NotImplementedError 52 if limit is None: 53 limit = MAX_DATA 54 f = io.BytesIO(data) 55 gzf = gzip.GzipFile(mode="rb", fileobj=f) 56 try: 57 if limit < 0: # no limit 58 decoded = gzf.read() 59 else: 60 decoded = gzf.read(limit + 1) 61 except IOError: # pragma: no cover 62 raise ValueError("invalid data") 63 f.close() 64 gzf.close() 65 if limit >= 0 and len(decoded) > limit: 66 raise ValueError("max gzipped payload length exceeded") 67 return decoded 68 69 70class DefusedGzipDecodedResponse(gzip.GzipFile if gzip else object): 71 """a file-like object to decode a response encoded with the gzip 72 method, as described in RFC 1952. 73 """ 74 75 def __init__(self, response, limit=None): 76 # response doesn't support tell() and read(), required by 77 # GzipFile 78 if not gzip: # pragma: no cover 79 raise NotImplementedError 80 self.limit = limit = limit if limit is not None else MAX_DATA 81 if limit < 0: # no limit 82 data = response.read() 83 self.readlength = None 84 else: 85 data = response.read(limit + 1) 86 self.readlength = 0 87 if limit >= 0 and len(data) > limit: 88 raise ValueError("max payload length exceeded") 89 self.stringio = io.BytesIO(data) 90 gzip.GzipFile.__init__(self, mode="rb", fileobj=self.stringio) 91 92 def read(self, n): 93 if self.limit >= 0: 94 left = self.limit - self.readlength 95 n = min(n, left + 1) 96 data = gzip.GzipFile.read(self, n) 97 self.readlength += len(data) 98 if self.readlength > self.limit: 99 raise ValueError("max payload length exceeded") 100 return data 101 else: 102 return gzip.GzipFile.read(self, n) 103 104 def close(self): 105 gzip.GzipFile.close(self) 106 self.stringio.close() 107 108 109class DefusedExpatParser(ExpatParser): 110 def __init__(self, target, forbid_dtd=False, forbid_entities=True, forbid_external=True): 111 ExpatParser.__init__(self, target) 112 self.forbid_dtd = forbid_dtd 113 self.forbid_entities = forbid_entities 114 self.forbid_external = forbid_external 115 parser = self._parser 116 if self.forbid_dtd: 117 parser.StartDoctypeDeclHandler = self.defused_start_doctype_decl 118 if self.forbid_entities: 119 parser.EntityDeclHandler = self.defused_entity_decl 120 parser.UnparsedEntityDeclHandler = self.defused_unparsed_entity_decl 121 if self.forbid_external: 122 parser.ExternalEntityRefHandler = self.defused_external_entity_ref_handler 123 124 def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset): 125 raise DTDForbidden(name, sysid, pubid) 126 127 def defused_entity_decl( 128 self, name, is_parameter_entity, value, base, sysid, pubid, notation_name 129 ): 130 raise EntitiesForbidden(name, value, base, sysid, pubid, notation_name) 131 132 def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name): 133 # expat 1.2 134 raise EntitiesForbidden(name, None, base, sysid, pubid, notation_name) # pragma: no cover 135 136 def defused_external_entity_ref_handler(self, context, base, sysid, pubid): 137 raise ExternalReferenceForbidden(context, base, sysid, pubid) 138 139 140def monkey_patch(): 141 xmlrpc_client.FastParser = DefusedExpatParser 142 xmlrpc_client.GzipDecodedResponse = DefusedGzipDecodedResponse 143 xmlrpc_client.gzip_decode = defused_gzip_decode 144 if xmlrpc_server: 145 xmlrpc_server.gzip_decode = defused_gzip_decode 146 147 148def unmonkey_patch(): 149 xmlrpc_client.FastParser = None 150 xmlrpc_client.GzipDecodedResponse = _OrigGzipDecodedResponse 151 xmlrpc_client.gzip_decode = _orig_gzip_decode 152 if xmlrpc_server: 153 xmlrpc_server.gzip_decode = _orig_gzip_decode 154