1############################################################################### 2# Tested so far: 3# 4# NetrWkstaGetInfo 5# NetrWkstaUserEnum 6# NetrWkstaTransportEnum 7# NetrWkstaTransportAdd 8# NetrUseAdd 9# NetrUseGetInfo 10# NetrUseDel 11# NetrUseEnum 12# NetrWorkstationStatisticsGet 13# NetrGetJoinInformation 14# NetrJoinDomain2 15# NetrUnjoinDomain2 16# NetrRenameMachineInDomain2 17# NetrValidateName2 18# NetrGetJoinableOUs2 19# NetrAddAlternateComputerName 20# NetrRemoveAlternateComputerName 21# NetrSetPrimaryComputerName 22# NetrEnumerateComputerNames 23# 24# Not yet: 25# 26# Shouldn't dump errors against a win7 27# 28################################################################################ 29 30import unittest 31import ConfigParser 32 33from impacket.dcerpc.v5 import transport 34from impacket.dcerpc.v5 import wkst 35from impacket.dcerpc.v5.ndr import NULL 36 37 38class WKSTTests(unittest.TestCase): 39 def connect(self): 40 rpctransport = transport.DCERPCTransportFactory(self.stringBinding) 41 if len(self.hashes) > 0: 42 lmhash, nthash = self.hashes.split(':') 43 else: 44 lmhash = '' 45 nthash = '' 46 if hasattr(rpctransport, 'set_credentials'): 47 # This method exists only for selected protocol sequences. 48 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 49 dce = rpctransport.get_dce_rpc() 50 dce.connect() 51 dce.bind(wkst.MSRPC_UUID_WKST, transfer_syntax = self.ts) 52 53 return dce, rpctransport 54 55 def test_NetrWkstaGetInfo(self): 56 dce, rpctransport = self.connect() 57 request = wkst.NetrWkstaGetInfo() 58 request['ServerName'] = '\x00'*10 59 request['Level'] = 100 60 resp = dce.request(request) 61 resp.dump() 62 63 request['Level'] = 101 64 resp = dce.request(request) 65 resp.dump() 66 67 request['Level'] = 102 68 resp = dce.request(request) 69 resp.dump() 70 71 request['Level'] = 502 72 resp = dce.request(request) 73 resp.dump() 74 75 def test_hNetrWkstaGetInfo(self): 76 dce, rpctransport = self.connect() 77 resp = wkst.hNetrWkstaGetInfo(dce, 100) 78 resp.dump() 79 80 resp = wkst.hNetrWkstaGetInfo(dce, 101) 81 resp.dump() 82 83 resp = wkst.hNetrWkstaGetInfo(dce, 102) 84 resp.dump() 85 86 resp = wkst.hNetrWkstaGetInfo(dce, 502) 87 resp.dump() 88 89 def test_NetrWkstaUserEnum(self): 90 dce, rpctransport = self.connect() 91 request = wkst.NetrWkstaUserEnum() 92 request['ServerName'] = '\x00'*10 93 request['UserInfo']['Level'] = 0 94 request['UserInfo']['WkstaUserInfo']['tag'] = 0 95 request['PreferredMaximumLength'] = 8192 96 resp = dce.request(request) 97 resp.dump() 98 99 request['UserInfo']['Level'] = 1 100 request['UserInfo']['WkstaUserInfo']['tag'] = 1 101 resp = dce.request(request) 102 resp.dump() 103 104 def test_hNetrWkstaUserEnum(self): 105 dce, rpctransport = self.connect() 106 resp = wkst.hNetrWkstaUserEnum(dce, 0) 107 resp.dump() 108 109 resp = wkst.hNetrWkstaUserEnum(dce, 1) 110 resp.dump() 111 112 def test_NetrWkstaTransportEnum(self): 113 dce, rpctransport = self.connect() 114 request = wkst.NetrWkstaTransportEnum() 115 request['ServerName'] = '\x00'*10 116 request['TransportInfo']['Level'] = 0 117 request['TransportInfo']['WkstaTransportInfo']['tag'] = 0 118 request['PreferredMaximumLength'] = 500 119 request['ResumeHandle'] = NULL 120 resp = dce.request(request) 121 resp.dump() 122 123 def test_hNetrWkstaTransportEnum(self): 124 dce, rpctransport = self.connect() 125 resp = wkst.hNetrWkstaTransportEnum(dce, 0) 126 resp.dump() 127 128 def test_NetrWkstaSetInfo(self): 129 dce, rpctransport = self.connect() 130 request = wkst.NetrWkstaGetInfo() 131 request['ServerName'] = '\x00'*10 132 request['Level'] = 502 133 resp = dce.request(request) 134 resp.dump() 135 oldVal = resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] 136 137 req = wkst.NetrWkstaSetInfo() 138 req['ServerName'] = '\x00'*10 139 req['Level'] = 502 140 req['WkstaInfo'] = resp['WkstaInfo'] 141 req['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = 500 142 resp2 = dce.request(req) 143 resp2.dump() 144 145 resp = dce.request(request) 146 self.assertTrue(500 == resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] ) 147 148 req['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = oldVal 149 resp2 = dce.request(req) 150 resp2.dump() 151 152 def test_hNetrWkstaSetInfo(self): 153 dce, rpctransport = self.connect() 154 resp = wkst.hNetrWkstaGetInfo(dce, 502) 155 resp.dump() 156 oldVal = resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] 157 158 159 resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = 500 160 resp2 = wkst.hNetrWkstaSetInfo(dce, 502,resp['WkstaInfo']['WkstaInfo502']) 161 resp2.dump() 162 163 resp = wkst.hNetrWkstaGetInfo(dce, 502) 164 resp.dump() 165 self.assertTrue(500 == resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] ) 166 167 resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = oldVal 168 resp2 = wkst.hNetrWkstaSetInfo(dce, 502,resp['WkstaInfo']['WkstaInfo502']) 169 resp2.dump() 170 171 def test_NetrWkstaTransportAdd(self): 172 dce, rpctransport = self.connect() 173 174 req = wkst.NetrWkstaTransportAdd() 175 req['ServerName'] = '\x00'*10 176 req['Level'] = 0 177 req['TransportInfo']['wkti0_transport_name'] = 'BETO\x00' 178 req['TransportInfo']['wkti0_transport_address'] = '000C29BC5CE5\x00' 179 try: 180 resp2 = dce.request(req) 181 resp2.dump() 182 except Exception, e: 183 if str(e).find('ERROR_INVALID_FUNCTION') < 0: 184 raise 185 186 def test_hNetrUseAdd_hNetrUseDel_hNetrUseGetInfo_hNetrUseEnum(self): 187 dce, rpctransport = self.connect() 188 189 info1 = wkst.LPUSE_INFO_1() 190 191 info1['ui1_local'] = 'Z:\x00' 192 info1['ui1_remote'] = '\\\\127.0.0.1\\c$\x00' 193 info1['ui1_password'] = NULL 194 try: 195 resp = wkst.hNetrUseAdd(dce, 1, info1) 196 resp.dump() 197 except Exception, e: 198 if str(e).find('rpc_s_access_denied') >=0: 199 # This could happen in newer OSes 200 pass 201 202 # We're not testing this call with NDR64, it fails and I can't see the contents 203 if self.ts == ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0'): 204 return 205 206 try: 207 resp = wkst.hNetrUseEnum(dce, 2) 208 resp.dump() 209 except Exception, e: 210 if str(e).find('STATUS_PIPE_DISCONNECTED') >=0: 211 # This could happen in newer OSes 212 pass 213 214 try: 215 resp2 = wkst.hNetrUseGetInfo(dce, 'Z:', 3) 216 resp2.dump() 217 except Exception, e: 218 if str(e).find('STATUS_PIPE_DISCONNECTED') >=0: 219 # This could happen in newer OSes 220 pass 221 222 try: 223 resp = wkst.hNetrUseDel(dce,'Z:') 224 resp.dump() 225 except Exception, e: 226 if str(e).find('STATUS_PIPE_DISCONNECTED') >=0: 227 # This could happen in newer OSes 228 pass 229 230 def test_NetrUseAdd_NetrUseDel_NetrUseGetInfo_NetrUseEnum(self): 231 dce, rpctransport = self.connect() 232 233 req = wkst.NetrUseAdd() 234 req['ServerName'] = '\x00'*10 235 req['Level'] = 1 236 req['InfoStruct']['tag'] = 1 237 req['InfoStruct']['UseInfo1']['ui1_local'] = 'Z:\x00' 238 req['InfoStruct']['UseInfo1']['ui1_remote'] = '\\\\127.0.0.1\\c$\x00' 239 req['InfoStruct']['UseInfo1']['ui1_password'] = NULL 240 try: 241 resp2 = dce.request(req) 242 resp2.dump() 243 except Exception, e: 244 if str(e).find('rpc_s_access_denied') >=0: 245 # This could happen in newer OSes 246 pass 247 248 # We're not testing this call with NDR64, it fails and I can't see the contents 249 if self.ts == ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0'): 250 return 251 252 req = wkst.NetrUseEnum() 253 req['ServerName'] = NULL 254 req['InfoStruct']['Level'] = 2 255 req['InfoStruct']['UseInfo']['tag'] = 2 256 req['InfoStruct']['UseInfo']['Level2']['Buffer'] = NULL 257 req['PreferredMaximumLength'] = 0xffffffff 258 req['ResumeHandle'] = NULL 259 try: 260 resp2 = dce.request(req) 261 resp2.dump() 262 except Exception, e: 263 if str(e).find('rpc_s_access_denied') >=0: 264 # This could happen in newer OSes 265 pass 266 267 req = wkst.NetrUseGetInfo() 268 req['ServerName'] = '\x00'*10 269 req['UseName'] = 'Z:\x00' 270 req['Level'] = 3 271 try: 272 resp2 = dce.request(req) 273 resp2.dump() 274 except Exception, e: 275 if str(e).find('rpc_s_access_denied') >=0: 276 # This could happen in newer OSes 277 pass 278 279 280 req = wkst.NetrUseDel() 281 req['ServerName'] = '\x00'*10 282 req['UseName'] = 'Z:\x00' 283 req['ForceLevel'] = wkst.USE_LOTS_OF_FORCE 284 try: 285 resp2 = dce.request(req) 286 resp2.dump() 287 except Exception, e: 288 if str(e).find('rpc_s_access_denied') >=0: 289 # This could happen in newer OSes 290 pass 291 292 293 def test_NetrWorkstationStatisticsGet(self): 294 dce, rpctransport = self.connect() 295 296 req = wkst.NetrWorkstationStatisticsGet() 297 req['ServerName'] = '\x00'*10 298 req['ServiceName'] = '\x00' 299 req['Level'] = 0 300 req['Options'] = 0 301 try: 302 resp2 = dce.request(req) 303 resp2.dump() 304 except Exception, e: 305 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 306 raise 307 308 def test_hNetrWorkstationStatisticsGet(self): 309 dce, rpctransport = self.connect() 310 311 try: 312 resp2 = wkst.hNetrWorkstationStatisticsGet(dce, '\x00', 0, 0) 313 resp2.dump() 314 except Exception, e: 315 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 316 raise 317 318 def test_NetrGetJoinInformation(self): 319 dce, rpctransport = self.connect() 320 321 req = wkst.NetrGetJoinInformation() 322 req['ServerName'] = '\x00'*10 323 req['NameBuffer'] = '\x00' 324 325 try: 326 resp2 = dce.request(req) 327 resp2.dump() 328 except Exception, e: 329 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 330 raise 331 332 def test_hNetrGetJoinInformation(self): 333 dce, rpctransport = self.connect() 334 335 try: 336 resp = wkst.hNetrGetJoinInformation(dce, '\x00') 337 resp.dump() 338 except Exception, e: 339 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 340 raise 341 342 def test_NetrJoinDomain2(self): 343 dce, rpctransport = self.connect() 344 345 req = wkst.NetrJoinDomain2() 346 req['ServerName'] = '\x00'*10 347 req['DomainNameParam'] = '172.16.123.1\\FREEFLY\x00' 348 req['MachineAccountOU'] = 'OU=BETUS,DC=FREEFLY\x00' 349 req['AccountName'] = NULL 350 req['Password']['Buffer'] = '\x00'*512 351 req['Options'] = wkst.NETSETUP_DOMAIN_JOIN_IF_JOINED 352 #req.dump() 353 try: 354 resp2 = dce.request(req) 355 resp2.dump() 356 except Exception, e: 357 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 358 raise 359 360 def test_hNetrJoinDomain2(self): 361 dce, rpctransport = self.connect() 362 363 try: 364 resp = wkst.hNetrJoinDomain2(dce,'172.16.123.1\\FREEFLY\x00','OU=BETUS,DC=FREEFLY\x00',NULL,'\x00'*512, wkst.NETSETUP_DOMAIN_JOIN_IF_JOINED) 365 resp.dump() 366 except Exception, e: 367 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 368 raise 369 370 def test_NetrUnjoinDomain2(self): 371 dce, rpctransport = self.connect() 372 373 req = wkst.NetrUnjoinDomain2() 374 req['ServerName'] = '\x00'*10 375 req['AccountName'] = NULL 376 req['Password']['Buffer'] = '\x00'*512 377 #req['Password'] = NULL 378 req['Options'] = wkst.NETSETUP_ACCT_DELETE 379 try: 380 resp2 = dce.request(req) 381 resp2.dump() 382 except Exception, e: 383 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 384 raise 385 386 def test_hNetrUnjoinDomain2(self): 387 dce, rpctransport = self.connect() 388 389 try: 390 resp = wkst.hNetrUnjoinDomain2(dce, NULL, '\x00'*512, wkst.NETSETUP_ACCT_DELETE) 391 resp.dump() 392 except Exception, e: 393 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 394 raise 395 396 def test_NetrRenameMachineInDomain2(self): 397 dce, rpctransport = self.connect() 398 399 req = wkst.NetrRenameMachineInDomain2() 400 req['ServerName'] = '\x00'*10 401 req['MachineName'] = 'BETUS\x00' 402 req['AccountName'] = NULL 403 req['Password']['Buffer'] = '\x00'*512 404 #req['Password'] = NULL 405 req['Options'] = wkst.NETSETUP_ACCT_CREATE 406 try: 407 resp2 = dce.request(req) 408 resp2.dump() 409 except Exception, e: 410 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 411 raise 412 413 def test_hNetrRenameMachineInDomain2(self): 414 dce, rpctransport = self.connect() 415 416 try: 417 resp = wkst.hNetrRenameMachineInDomain2(dce, 'BETUS\x00', NULL, '\x00'*512, wkst.NETSETUP_ACCT_CREATE) 418 resp.dump() 419 except Exception, e: 420 if str(e).find('ERROR_INVALID_PASSWORD') < 0: 421 raise 422 423 def test_NetrValidateName2(self): 424 dce, rpctransport = self.connect() 425 426 req = wkst.NetrValidateName2() 427 req['ServerName'] = '\x00'*10 428 req['NameToValidate'] = 'BETO\x00' 429 req['AccountName'] = NULL 430 req['Password'] = NULL 431 req['NameType'] = wkst.NETSETUP_NAME_TYPE.NetSetupDomain 432 try: 433 resp2 = dce.request(req) 434 resp2.dump() 435 except Exception, e: 436 if str(e).find('0x8001011c') < 0: 437 raise 438 439 def test_hNetrValidateName2(self): 440 dce, rpctransport = self.connect() 441 442 try: 443 resp2 = wkst.hNetrValidateName2(dce, 'BETO\x00', NULL, NULL, wkst.NETSETUP_NAME_TYPE.NetSetupDomain) 444 resp2.dump() 445 except Exception, e: 446 if str(e).find('0x8001011c') < 0: 447 raise 448 449 def test_NetrGetJoinableOUs2(self): 450 dce, rpctransport = self.connect() 451 452 req = wkst.NetrGetJoinableOUs2() 453 req['ServerName'] = '\x00'*10 454 req['DomainNameParam'] = 'FREEFLY\x00' 455 req['AccountName'] = NULL 456 req['Password'] = NULL 457 req['OUCount'] = 0 458 #req.dump() 459 try: 460 resp2 = dce.request(req) 461 resp2.dump() 462 except Exception, e: 463 if str(e).find('0x8001011c') < 0: 464 raise 465 466 def test_hNetrGetJoinableOUs2(self): 467 dce, rpctransport = self.connect() 468 469 try: 470 resp = wkst.hNetrGetJoinableOUs2(dce,'FREEFLY\x00', NULL, NULL,0 ) 471 resp.dump() 472 except Exception, e: 473 if str(e).find('0x8001011c') < 0: 474 raise 475 476 def test_NetrAddAlternateComputerName(self): 477 dce, rpctransport = self.connect() 478 479 req = wkst.NetrAddAlternateComputerName() 480 req['ServerName'] = '\x00'*10 481 req['AlternateName'] = 'FREEFLY\x00' 482 req['DomainAccount'] = NULL 483 req['EncryptedPassword'] = NULL 484 #req.dump() 485 try: 486 resp2 = dce.request(req) 487 resp2.dump() 488 except Exception, e: 489 if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0: 490 raise 491 492 def test_hNetrAddAlternateComputerName(self): 493 dce, rpctransport = self.connect() 494 495 try: 496 resp2= wkst.hNetrAddAlternateComputerName(dce, 'FREEFLY\x00', NULL, NULL) 497 resp2.dump() 498 except Exception, e: 499 if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0: 500 raise 501 502 def test_NetrRemoveAlternateComputerName(self): 503 dce, rpctransport = self.connect() 504 505 req = wkst.NetrRemoveAlternateComputerName() 506 req['ServerName'] = '\x00'*10 507 req['AlternateName'] = 'FREEFLY\x00' 508 req['DomainAccount'] = NULL 509 req['EncryptedPassword'] = NULL 510 #req.dump() 511 try: 512 resp2 = dce.request(req) 513 resp2.dump() 514 except Exception, e: 515 if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0: 516 raise 517 518 def test_hNetrRemoveAlternateComputerName(self): 519 dce, rpctransport = self.connect() 520 521 try: 522 resp2 = wkst.hNetrRemoveAlternateComputerName(dce,'FREEFLY\x00', NULL, NULL ) 523 resp2.dump() 524 except Exception, e: 525 if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0: 526 raise 527 528 def test_NetrSetPrimaryComputerName(self): 529 dce, rpctransport = self.connect() 530 531 req = wkst.NetrSetPrimaryComputerName() 532 req['ServerName'] = '\x00'*10 533 req['PrimaryName'] = 'FREEFLY\x00' 534 req['DomainAccount'] = NULL 535 req['EncryptedPassword'] = NULL 536 #req.dump() 537 try: 538 resp2 = dce.request(req) 539 resp2.dump() 540 except Exception, e: 541 if str(e).find('ERROR_NOT_SUPPORTED') < 0: 542 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 543 raise 544 545 def test_hNetrSetPrimaryComputerName(self): 546 dce, rpctransport = self.connect() 547 548 try: 549 resp2 = wkst.hNetrSetPrimaryComputerName(dce,'FREEFLY\x00', NULL, NULL ) 550 resp2.dump() 551 except Exception, e: 552 if str(e).find('ERROR_NOT_SUPPORTED') < 0: 553 if str(e).find('ERROR_INVALID_PARAMETER') < 0: 554 raise 555 556 def test_NetrEnumerateComputerNames(self): 557 dce, rpctransport = self.connect() 558 559 req = wkst.NetrEnumerateComputerNames() 560 req['ServerName'] = '\x00'*10 561 req['NameType'] = wkst.NET_COMPUTER_NAME_TYPE.NetAllComputerNames 562 #req.dump() 563 try: 564 resp2 = dce.request(req) 565 resp2.dump() 566 except Exception, e: 567 if str(e).find('ERROR_NOT_SUPPORTED') < 0: 568 raise 569 570 def test_hNetrEnumerateComputerNames(self): 571 dce, rpctransport = self.connect() 572 573 try: 574 resp2 = wkst.hNetrEnumerateComputerNames(dce,wkst.NET_COMPUTER_NAME_TYPE.NetAllComputerNames) 575 resp2.dump() 576 except Exception, e: 577 if str(e).find('ERROR_NOT_SUPPORTED') < 0: 578 raise 579 580 581class SMBTransport(WKSTTests): 582 def setUp(self): 583 WKSTTests.setUp(self) 584 configFile = ConfigParser.ConfigParser() 585 configFile.read('dcetests.cfg') 586 self.username = configFile.get('SMBTransport', 'username') 587 self.domain = configFile.get('SMBTransport', 'domain') 588 self.serverName = configFile.get('SMBTransport', 'servername') 589 self.password = configFile.get('SMBTransport', 'password') 590 self.machine = configFile.get('SMBTransport', 'machine') 591 self.hashes = configFile.get('SMBTransport', 'hashes') 592 self.stringBinding = r'ncacn_np:%s[\PIPE\wkssvc]' % self.machine 593 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 594 595class SMBTransport64(WKSTTests): 596 def setUp(self): 597 WKSTTests.setUp(self) 598 configFile = ConfigParser.ConfigParser() 599 configFile.read('dcetests.cfg') 600 self.username = configFile.get('SMBTransport', 'username') 601 self.domain = configFile.get('SMBTransport', 'domain') 602 self.serverName = configFile.get('SMBTransport', 'servername') 603 self.password = configFile.get('SMBTransport', 'password') 604 self.machine = configFile.get('SMBTransport', 'machine') 605 self.hashes = configFile.get('SMBTransport', 'hashes') 606 self.stringBinding = r'ncacn_np:%s[\PIPE\wkssvc]' % self.machine 607 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 608 609# Process command-line arguments. 610if __name__ == '__main__': 611 import sys 612 if len(sys.argv) > 1: 613 testcase = sys.argv[1] 614 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 615 else: 616 suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport) 617 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64)) 618 unittest.TextTestRunner(verbosity=1).run(suite) 619