1import unittest 2import ConfigParser 3 4from impacket.dcerpc.v5.ndr import NDRCALL 5from impacket.dcerpc.v5 import transport, epm, samr 6from impacket.dcerpc.v5.dtypes import NULL 7from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY, \ 8 RPC_C_AUTHN_LEVEL_NONE, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_WINNT 9from impacket.dcerpc.v5.dtypes import RPC_UNICODE_STRING 10 11 12# aimed at testing just the DCERPC engine, not the particular 13# endpoints (we should do specific tests for endpoints) 14# here we're using EPM just because we need one, and it's the 15# easiest one 16 17class DCERPCTests(unittest.TestCase): 18 def connectDCE(self, username, password, domain, lm='', nt='', aesKey='', TGT=None, TGS=None, tfragment=0, 19 dceFragment=0, 20 auth_type=RPC_C_AUTHN_WINNT, auth_level=RPC_C_AUTHN_LEVEL_NONE, dceAuth=True, doKerberos=False, 21 bind=epm.MSRPC_UUID_PORTMAP): 22 rpctransport = transport.DCERPCTransportFactory(self.stringBinding) 23 24 if hasattr(rpctransport, 'set_credentials'): 25 # This method exists only for selected protocol sequences. 26 rpctransport.set_credentials(username, password, domain, lm, nt, aesKey, TGT, TGS) 27 rpctransport.set_kerberos(doKerberos, kdcHost=self.machine) 28 29 rpctransport.set_max_fragment_size(tfragment) 30 rpctransport.setRemoteName(self.serverName) 31 rpctransport.setRemoteHost(self.machine) 32 dce = rpctransport.get_dce_rpc() 33 dce.set_max_fragment_size(dceFragment) 34 if dceAuth is True: 35 dce.set_credentials(*(rpctransport.get_credentials())) 36 dce.connect() 37 dce.set_auth_type(auth_type) 38 dce.set_auth_level(auth_level) 39 dce.bind(bind) 40 41 return dce 42 43 def test_connection(self): 44 dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=False) 45 dce.disconnect() 46 47 def test_connectionHashes(self): 48 lmhash, nthash = self.hashes.split(':') 49 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=False) 50 dce.disconnect() 51 52 def test_dceAuth(self): 53 dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=True) 54 resp = epm.hept_lookup(self.machine) 55 dce.disconnect() 56 57 def test_dceAuthKerberos(self): 58 dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=True, doKerberos=True) 59 resp = epm.hept_lookup(self.machine) 60 dce.disconnect() 61 62 def test_dceAuthHasHashes(self): 63 lmhash, nthash = self.hashes.split(':') 64 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=True) 65 resp = epm.hept_lookup(self.machine) 66 dce.disconnect() 67 68 def test_dceAuthHasHashesKerberos(self): 69 lmhash, nthash = self.hashes.split(':') 70 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=True, doKerberos=True) 71 resp = epm.hept_lookup(self.machine) 72 dce.disconnect() 73 74 def test_dceAuthHasAes128Kerberos(self): 75 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128, dceAuth=True, doKerberos=True) 76 resp = epm.hept_lookup(self.machine) 77 dce.disconnect() 78 79 def test_dceAuthHasAes256Kerberos(self): 80 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256, dceAuth=True, doKerberos=True) 81 resp = epm.hept_lookup(self.machine) 82 dce.disconnect() 83 84 def test_dceTransportFragmentation(self): 85 lmhash, nthash = self.hashes.split(':') 86 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, tfragment=1, dceAuth=True, doKerberos=False) 87 request = epm.ept_lookup() 88 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 89 request['object'] = NULL 90 request['Ifid'] = NULL 91 request['vers_option'] = epm.RPC_C_VERS_ALL 92 request['max_ents'] = 499 93 resp = dce.request(request) 94 dce.disconnect() 95 96 def test_dceFragmentation(self): 97 lmhash, nthash = self.hashes.split(':') 98 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, dceAuth=True, doKerberos=False) 99 request = epm.ept_lookup() 100 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 101 request['object'] = NULL 102 request['Ifid'] = NULL 103 request['vers_option'] = epm.RPC_C_VERS_ALL 104 request['max_ents'] = 499 105 resp = dce.request(request) 106 dce.disconnect() 107 108 def test_bigRequestMustFragment(self): 109 class dummyCall(NDRCALL): 110 opnum = 2 111 structure = ( 112 ('Name', RPC_UNICODE_STRING), 113 ) 114 lmhash, nthash = self.hashes.split(':') 115 oldBinding = self.stringBinding 116 self.stringBinding = epm.hept_map(self.machine, samr.MSRPC_UUID_SAMR, protocol = 'ncacn_ip_tcp') 117 print self.stringBinding 118 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=0, 119 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 120 dceAuth=True, 121 doKerberos=True, bind=samr.MSRPC_UUID_SAMR) 122 self.stringBinding = oldBinding 123 124 request = samr.SamrConnect() 125 request['ServerName'] = u'BETO\x00' 126 request['DesiredAccess'] = samr.DELETE | samr.READ_CONTROL | samr.WRITE_DAC | samr.WRITE_OWNER | samr.ACCESS_SYSTEM_SECURITY | samr.GENERIC_READ | samr.GENERIC_WRITE | samr.GENERIC_EXECUTE | samr.SAM_SERVER_CONNECT | samr.SAM_SERVER_SHUTDOWN | samr.SAM_SERVER_INITIALIZE | samr.SAM_SERVER_CREATE_DOMAIN | samr.SAM_SERVER_ENUMERATE_DOMAINS | samr.SAM_SERVER_LOOKUP_DOMAIN | samr.SAM_SERVER_READ | samr.SAM_SERVER_WRITE | samr.SAM_SERVER_EXECUTE 127 resp = dce.request(request) 128 request = samr.SamrEnumerateDomainsInSamServer() 129 request['ServerHandle'] = resp['ServerHandle'] 130 request['EnumerationContext'] = 0 131 request['PreferedMaximumLength'] = 500 132 resp2 = dce.request(request) 133 try: 134 request = samr.SamrLookupDomainInSamServer() 135 request['ServerHandle'] = resp['ServerHandle'] 136 request['Name'] = 'A'*4500 137 resp = dce.request(request) 138 except Exception, e: 139 if str(e).find('STATUS_NO_SUCH_DOMAIN') < 0: 140 raise 141 dce.disconnect() 142 143 def test_dceFragmentationWINNTPacketIntegrity(self): 144 lmhash, nthash = self.hashes.split(':') 145 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, 146 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=False) 147 request = epm.ept_lookup() 148 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 149 request['object'] = NULL 150 request['Ifid'] = NULL 151 request['vers_option'] = epm.RPC_C_VERS_ALL 152 request['max_ents'] = 499 153 resp = dce.request(request) 154 dce.disconnect() 155 156 def test_dceFragmentationWINNTPacketPrivacy(self): 157 lmhash, nthash = self.hashes.split(':') 158 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, 159 auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=False) 160 request = epm.ept_lookup() 161 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 162 request['object'] = NULL 163 request['Ifid'] = NULL 164 request['vers_option'] = epm.RPC_C_VERS_ALL 165 request['max_ents'] = 499 166 resp = dce.request(request) 167 dce.disconnect() 168 169 def test_dceFragmentationKerberosPacketIntegrity(self): 170 lmhash, nthash = self.hashes.split(':') 171 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, 172 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 173 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True) 174 request = epm.ept_lookup() 175 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 176 request['object'] = NULL 177 request['Ifid'] = NULL 178 request['vers_option'] = epm.RPC_C_VERS_ALL 179 request['max_ents'] = 499 180 resp = dce.request(request) 181 dce.disconnect() 182 183 def test_dceFragmentationKerberosPacketPrivacy(self): 184 lmhash, nthash = self.hashes.split(':') 185 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, 186 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 187 auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True) 188 request = epm.ept_lookup() 189 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 190 request['object'] = NULL 191 request['Ifid'] = NULL 192 request['vers_option'] = epm.RPC_C_VERS_ALL 193 request['max_ents'] = 499 194 resp = dce.request(request) 195 dce.disconnect() 196 197 def test_WINNTPacketIntegrity(self): 198 dce = self.connectDCE(self.username, self.password, self.domain, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, 199 dceAuth=True, doKerberos=False) 200 request = epm.ept_lookup() 201 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 202 request['object'] = NULL 203 request['Ifid'] = NULL 204 request['vers_option'] = epm.RPC_C_VERS_ALL 205 request['max_ents'] = 499 206 resp = dce.request(request) 207 dce.disconnect() 208 209 def test_KerberosPacketIntegrity(self): 210 dce = self.connectDCE(self.username, self.password, self.domain, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 211 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True) 212 request = epm.ept_lookup() 213 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 214 request['object'] = NULL 215 request['Ifid'] = NULL 216 request['vers_option'] = epm.RPC_C_VERS_ALL 217 request['max_ents'] = 499 218 resp = dce.request(request) 219 resp = dce.request(request) 220 resp.dump() 221 dce.disconnect() 222 223 def test_HashesWINNTPacketIntegrity(self): 224 lmhash, nthash = self.hashes.split(':') 225 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, 226 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=False) 227 request = epm.ept_lookup() 228 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 229 request['object'] = NULL 230 request['Ifid'] = NULL 231 request['vers_option'] = epm.RPC_C_VERS_ALL 232 request['max_ents'] = 499 233 resp = dce.request(request) 234 dce.disconnect() 235 236 def test_HashesKerberosPacketIntegrity(self): 237 lmhash, nthash = self.hashes.split(':') 238 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 239 auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True) 240 request = epm.ept_lookup() 241 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 242 request['object'] = NULL 243 request['Ifid'] = NULL 244 request['vers_option'] = epm.RPC_C_VERS_ALL 245 request['max_ents'] = 499 246 resp = dce.request(request) 247 resp = dce.request(request) 248 resp.dump() 249 dce.disconnect() 250 251 def test_Aes128KerberosPacketIntegrity(self): 252 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128, 253 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, 254 dceAuth=True, doKerberos=True) 255 request = epm.ept_lookup() 256 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 257 request['object'] = NULL 258 request['Ifid'] = NULL 259 request['vers_option'] = epm.RPC_C_VERS_ALL 260 request['max_ents'] = 499 261 resp = dce.request(request) 262 resp = dce.request(request) 263 resp.dump() 264 dce.disconnect() 265 266 def test_Aes256KerberosPacketIntegrity(self): 267 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256, 268 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, 269 dceAuth=True, doKerberos=True) 270 request = epm.ept_lookup() 271 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 272 request['object'] = NULL 273 request['Ifid'] = NULL 274 request['vers_option'] = epm.RPC_C_VERS_ALL 275 request['max_ents'] = 499 276 resp = dce.request(request) 277 resp = dce.request(request) 278 resp.dump() 279 dce.disconnect() 280 281 def test_packetAnonWINNTPacketIntegrity(self): 282 # With SMB Transport this will fail with STATUS_ACCESS_DENIED 283 try: 284 dce = self.connectDCE('', '', '', auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY,dceAuth=False, doKerberos=False) 285 request = epm.ept_lookup() 286 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 287 request['object'] = NULL 288 request['Ifid'] = NULL 289 request['vers_option'] = epm.RPC_C_VERS_ALL 290 request['max_ents'] = 499 291 resp = dce.request(request) 292 dce.disconnect() 293 except Exception, e: 294 if not (str(e).find('STATUS_ACCESS_DENIED') >=0 and self.stringBinding.find('ncacn_np') >=0): 295 raise 296 297 def test_WINNTPacketPrivacy(self): 298 dce = self.connectDCE(self.username, self.password, self.domain, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, 299 dceAuth=True, doKerberos=False) 300 request = epm.ept_lookup() 301 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 302 request['object'] = NULL 303 request['Ifid'] = NULL 304 request['vers_option'] = epm.RPC_C_VERS_ALL 305 request['max_ents'] = 499 306 resp = dce.request(request) 307 resp = dce.request(request) 308 dce.disconnect() 309 310 def test_KerberosPacketPrivacy(self): 311 dce = self.connectDCE(self.username, self.password, self.domain, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 312 auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True) 313 request = epm.ept_lookup() 314 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 315 request['object'] = NULL 316 request['Ifid'] = NULL 317 request['vers_option'] = epm.RPC_C_VERS_ALL 318 request['max_ents'] = 499 319 resp = dce.request(request) 320 resp = dce.request(request) 321 resp.dump() 322 dce.disconnect() 323 324 def test_HashesWINNTPacketPrivacy(self): 325 lmhash, nthash = self.hashes.split(':') 326 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, 327 dceAuth=True, doKerberos=False) 328 request = epm.ept_lookup() 329 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 330 request['object'] = NULL 331 request['Ifid'] = NULL 332 request['vers_option'] = epm.RPC_C_VERS_ALL 333 request['max_ents'] = 499 334 resp = dce.request(request) 335 dce.disconnect() 336 337 def test_HashesKerberosPacketPrivacy(self): 338 lmhash, nthash = self.hashes.split(':') 339 dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, 340 auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True) 341 request = epm.ept_lookup() 342 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 343 request['object'] = NULL 344 request['Ifid'] = NULL 345 request['vers_option'] = epm.RPC_C_VERS_ALL 346 request['max_ents'] = 499 347 resp = dce.request(request) 348 resp = dce.request(request) 349 resp.dump() 350 dce.disconnect() 351 352 def test_Aes128KerberosPacketPrivacy(self): 353 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128, 354 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, 355 dceAuth=True, doKerberos=True) 356 request = epm.ept_lookup() 357 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 358 request['object'] = NULL 359 request['Ifid'] = NULL 360 request['vers_option'] = epm.RPC_C_VERS_ALL 361 request['max_ents'] = 499 362 resp = dce.request(request) 363 resp = dce.request(request) 364 resp.dump() 365 dce.disconnect() 366 367 def test_Aes256KerberosPacketPrivacy(self): 368 dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256, 369 auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, 370 dceAuth=True, doKerberos=True) 371 request = epm.ept_lookup() 372 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 373 request['object'] = NULL 374 request['Ifid'] = NULL 375 request['vers_option'] = epm.RPC_C_VERS_ALL 376 request['max_ents'] = 499 377 resp = dce.request(request) 378 resp = dce.request(request) 379 resp.dump() 380 dce.disconnect() 381 382 def test_AnonWINNTPacketPrivacy(self): 383 # With SMB Transport this will fail with STATUS_ACCESS_DENIED 384 try: 385 dce = self.connectDCE('', '', '', auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,dceAuth=False, doKerberos=False) 386 request = epm.ept_lookup() 387 request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS 388 request['object'] = NULL 389 request['Ifid'] = NULL 390 request['vers_option'] = epm.RPC_C_VERS_ALL 391 request['max_ents'] = 499 392 resp = dce.request(request) 393 dce.disconnect() 394 except Exception, e: 395 if not (str(e).find('STATUS_ACCESS_DENIED') >=0 and self.stringBinding.find('ncacn_np') >=0): 396 raise 397 398class TCPTransport(DCERPCTests): 399 def setUp(self): 400 DCERPCTests.setUp(self) 401 # Put specific configuration for target machine with SMB1 402 configFile = ConfigParser.ConfigParser() 403 configFile.read('dcetests.cfg') 404 self.username = configFile.get('TCPTransport', 'username') 405 self.domain = configFile.get('TCPTransport', 'domain') 406 self.serverName = configFile.get('TCPTransport', 'servername') 407 self.password = configFile.get('TCPTransport', 'password') 408 self.machine = configFile.get('TCPTransport', 'machine') 409 self.hashes = configFile.get('TCPTransport', 'hashes') 410 self.aesKey256= configFile.get('TCPTransport', 'aesKey256') 411 self.aesKey128= configFile.get('TCPTransport', 'aesKey128') 412 self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine 413 414class SMBTransport(DCERPCTests): 415 def setUp(self): 416 # Put specific configuration for target machine with SMB_002 417 DCERPCTests.setUp(self) 418 configFile = ConfigParser.ConfigParser() 419 configFile.read('dcetests.cfg') 420 self.username = configFile.get('SMBTransport', 'username') 421 self.domain = configFile.get('SMBTransport', 'domain') 422 self.serverName = configFile.get('SMBTransport', 'servername') 423 self.password = configFile.get('SMBTransport', 'password') 424 self.machine = configFile.get('SMBTransport', 'machine') 425 self.hashes = configFile.get('SMBTransport', 'hashes') 426 self.aesKey256= configFile.get('SMBTransport', 'aesKey256') 427 self.aesKey128= configFile.get('SMBTransport', 'aesKey128') 428 self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine 429 430if __name__ == "__main__": 431 import sys 432 if len(sys.argv) > 1: 433 testcase = sys.argv[1] 434 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 435 else: 436 suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport) 437 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport)) 438 unittest.TextTestRunner(verbosity=1).run(suite) 439