1############################################################################### 2# Tested so far: 3# 4# DRSBind 5# DRSDomainControllerInfo 6# hDRSDomainControllerInfo 7# DRSCrackNames 8# hDRSCrackNames 9# DRSGetNT4ChangeLog 10# DRSVerifyName 11# 12# Not yet: 13# 14# Shouldn't dump errors against a win7 15# 16################################################################################ 17 18import unittest 19import ConfigParser 20 21from impacket.dcerpc.v5 import transport, epm 22from impacket.dcerpc.v5 import drsuapi 23from impacket.dcerpc.v5.dtypes import NULL, LPWSTR 24from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY 25 26 27class DRSRTests(unittest.TestCase): 28 def connect(self): 29 rpctransport = transport.DCERPCTransportFactory(self.stringBinding ) 30 if len(self.hashes) > 0: 31 lmhash, nthash = self.hashes.split(':') 32 else: 33 lmhash = '' 34 nthash = '' 35 if hasattr(rpctransport, 'set_credentials'): 36 # This method exists only for selected protocol sequences. 37 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 38 dce = rpctransport.get_dce_rpc() 39 dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) 40 dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) 41 dce.connect() 42 dce.bind(drsuapi.MSRPC_UUID_DRSUAPI, transfer_syntax = self.ts) 43 44 request = drsuapi.DRSBind() 45 request['puuidClientDsa'] = drsuapi.NTDSAPI_CLIENT_GUID 46 drs = drsuapi.DRS_EXTENSIONS_INT() 47 drs['cb'] = len(drs) #- 4 48 drs['dwFlags'] = drsuapi.DRS_EXT_GETCHGREQ_V6 | drsuapi.DRS_EXT_GETCHGREPLY_V6 | drsuapi.DRS_EXT_GETCHGREQ_V8 | drsuapi.DRS_EXT_STRONG_ENCRYPTION 49 drs['SiteObjGuid'] = drsuapi.NULLGUID 50 drs['Pid'] = 0 51 drs['dwReplEpoch'] = 0 52 drs['dwFlagsExt'] = drsuapi.DRS_EXT_RECYCLE_BIN 53 drs['ConfigObjGUID'] = drsuapi.NULLGUID 54 drs['dwExtCaps'] = 0 55 request['pextClient']['cb'] = len(drs) 56 request['pextClient']['rgb'] = list(str(drs)) 57 resp = dce.request(request) 58 59 # Let's dig into the answer to check the dwReplEpoch. This field should match the one we send as part of 60 # DRSBind's DRS_EXTENSIONS_INT(). If not, it will fail later when trying to sync data. 61 drsExtensionsInt = drsuapi.DRS_EXTENSIONS_INT() 62 63 # If dwExtCaps is not included in the answer, let's just add it so we can unpack DRS_EXTENSIONS_INT right. 64 ppextServer = ''.join(resp['ppextServer']['rgb']) + '\x00' * ( 65 len(drsuapi.DRS_EXTENSIONS_INT()) - resp['ppextServer']['cb']) 66 drsExtensionsInt.fromString(ppextServer) 67 68 if drsExtensionsInt['dwReplEpoch'] != 0: 69 # Different epoch, we have to call DRSBind again 70 drs['dwReplEpoch'] = drsExtensionsInt['dwReplEpoch'] 71 request['pextClient']['cb'] = len(drs) 72 request['pextClient']['rgb'] = list(str(drs)) 73 resp = dce.request(request) 74 75 resp2 = drsuapi.hDRSDomainControllerInfo(dce, resp['phDrs'], self.domain, 2) 76 77 return dce, rpctransport, resp['phDrs'], resp2['pmsgOut']['V2']['rItems'][0]['NtdsDsaObjectGuid'] 78 79 def connect2(self): 80 rpctransport = transport.DCERPCTransportFactory(self.stringBinding ) 81 if len(self.hashes) > 0: 82 lmhash, nthash = self.hashes.split(':') 83 else: 84 lmhash = '' 85 nthash = '' 86 if hasattr(rpctransport, 'set_credentials'): 87 # This method exists only for selected protocol sequences. 88 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 89 dce = rpctransport.get_dce_rpc() 90 dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) 91 #dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY) 92 dce.connect() 93 dce.bind(drsuapi.MSRPC_UUID_DRSUAPI, transfer_syntax = self.ts) 94 95 return dce, rpctransport 96 97 def test_DRSBind(self): 98 dce, rpctransport, _,_ = self.connect() 99 100 request = drsuapi.DRSBind() 101 request['puuidClientDsa'] = drsuapi.NTDSAPI_CLIENT_GUID 102 drs = drsuapi.DRS_EXTENSIONS_INT() 103 drs['cb'] = len(drs) - 4 104 drs['dwFlags'] = 0 105 drs['SiteObjGuid'] = drsuapi.NULLGUID 106 drs['Pid'] = 0x1234 107 drs['dwReplEpoch'] = 0 108 drs['dwFlagsExt'] = drsuapi.DRS_EXT_RECYCLE_BIN 109 drs['ConfigObjGUID'] = drsuapi.NULLGUID 110 drs['dwExtCaps'] = 0 111 request['pextClient']['cb'] = len(drs) 112 request['pextClient']['rgb'] = list(str(drs)) 113 resp = dce.request(request) 114 resp.dump() 115 116 extension = drsuapi.DRS_EXTENSIONS_INT('\x00'*4 + ''.join(resp['ppextServer']['rgb'])+'\x00'*4) 117 extension.dump() 118 119 def test_DRSDomainControllerInfo(self): 120 dce, rpctransport, hDrs, DsaObjDest = self.connect() 121 122 request = drsuapi.DRSDomainControllerInfo() 123 request['hDrs'] = hDrs 124 request['dwInVersion'] = 1 125 126 request['pmsgIn']['tag'] = 1 127 request['pmsgIn']['V1']['Domain'] = self.domain + '\x00' 128 request['pmsgIn']['V1']['InfoLevel'] = 1 129 130 resp = dce.request(request) 131 resp.dump() 132 133 request['pmsgIn']['V1']['InfoLevel'] = 2 134 resp = dce.request(request) 135 resp.dump() 136 137 request['pmsgIn']['V1']['InfoLevel'] = 3 138 resp = dce.request(request) 139 resp.dump() 140 141 request['pmsgIn']['V1']['InfoLevel'] = 0xffffffff 142 resp = dce.request(request) 143 resp.dump() 144 145 def test_hDRSDomainControllerInfo(self): 146 dce, rpctransport, hDrs, DsaObjDest = self.connect() 147 148 resp = drsuapi.hDRSDomainControllerInfo(dce, hDrs, self.domain, 1) 149 resp.dump() 150 151 resp = drsuapi.hDRSDomainControllerInfo(dce, hDrs, self.domain, 2) 152 resp.dump() 153 154 resp = drsuapi.hDRSDomainControllerInfo(dce, hDrs, self.domain, 3) 155 resp.dump() 156 157 resp = drsuapi.hDRSDomainControllerInfo(dce, hDrs, self.domain, 0xffffffff) 158 resp.dump() 159 160 def test_DRSCrackNames(self): 161 dce, rpctransport, hDrs, DsaObjDest = self.connect() 162 163 request = drsuapi.DRSCrackNames() 164 request['hDrs'] = hDrs 165 request['dwInVersion'] = 1 166 167 request['pmsgIn']['tag'] = 1 168 request['pmsgIn']['V1']['CodePage'] = 0 169 request['pmsgIn']['V1']['LocaleId'] = 0 170 request['pmsgIn']['V1']['dwFlags'] = 0 171 request['pmsgIn']['V1']['formatOffered'] = drsuapi.DS_NT4_ACCOUNT_NAME_SANS_DOMAIN 172 request['pmsgIn']['V1']['formatDesired'] = drsuapi.DS_USER_PRINCIPAL_NAME_FOR_LOGON 173 request['pmsgIn']['V1']['cNames'] = 1 174 name = LPWSTR() 175 #name['Data'] = 'FREEFLY-DC\x00' 176 #name['Data'] = 'CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=FREEFLY,DC=NET\x00' 177 #name['Data'] = 'CN=FREEFLY-DC,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=FREEFLY,DC=NET\x00' 178 name['Data'] = 'Administrator\x00' 179 request['pmsgIn']['V1']['rpNames'].append(name) 180 181 resp = dce.request(request) 182 resp.dump() 183 184 def test_hDRSCrackNames(self): 185 dce, rpctransport, hDrs, DsaObjDest = self.connect() 186 187 name = 'Administrator' 188 formatOffered = drsuapi.DS_NT4_ACCOUNT_NAME_SANS_DOMAIN 189 formatDesired = drsuapi.DS_STRING_SID_NAME 190 191 resp = drsuapi.hDRSCrackNames(dce, hDrs, 0, formatOffered, formatDesired, (name,)) 192 resp.dump() 193 194 name = 'CN=NTDS Settings,CN=DC1-WIN2012,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 195 resp = drsuapi.hDRSCrackNames(dce, hDrs, 0, drsuapi.DS_NAME_FORMAT.DS_FQDN_1779_NAME, drsuapi.DS_NAME_FORMAT.DS_UNIQUE_ID_NAME, (name,)) 196 resp.dump() 197 198 name = 'CN=NTDS Settings,CN=DC1-WIN2012,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 199 resp = drsuapi.hDRSCrackNames(dce, hDrs, 0, drsuapi.DS_NAME_FORMAT.DS_FQDN_1779_NAME, drsuapi.DS_STRING_SID_NAME, (name,)) 200 resp.dump() 201 202 name = self.domain.upper() 203 #name = '' 204 resp = drsuapi.hDRSCrackNames(dce, hDrs, 0, drsuapi.DS_LIST_ROLES, drsuapi.DS_NAME_FORMAT.DS_FQDN_1779_NAME, (name,)) 205 resp.dump() 206 207 def test_DRSGetNT4ChangeLog(self): 208 dce, rpctransport, hDrs, DsaObjDest = self.connect() 209 210 request = drsuapi.DRSGetNT4ChangeLog() 211 request['hDrs'] = hDrs 212 request['dwInVersion'] = 1 213 214 request['pmsgIn']['tag'] = 1 215 request['pmsgIn']['V1']['dwFlags'] = drsuapi.DRS_NT4_CHGLOG_GET_CHANGE_LOG | drsuapi.DRS_NT4_CHGLOG_GET_SERIAL_NUMBERS 216 request['pmsgIn']['V1']['PreferredMaximumLength'] = 0x4000 217 request['pmsgIn']['V1']['cbRestart'] = 0 218 request['pmsgIn']['V1']['pRestart'] = NULL 219 220 try: 221 resp = dce.request(request) 222 resp.dump() 223 except Exception, e: 224 if str(e).find('ERROR_NOT_SUPPORTED') <0: 225 raise 226 227 def test_DRSVerifyNames(self): 228 dce, rpctransport, hDrs, DsaObjDest = self.connect() 229 request = drsuapi.DRSVerifyNames() 230 231 request['hDrs'] = hDrs 232 request['dwInVersion'] = 1 233 234 request['pmsgIn']['tag'] = 1 235 request['pmsgIn']['V1']['dwFlags'] = drsuapi.DRS_VERIFY_DSNAMES 236 request['pmsgIn']['V1']['cNames'] = 1 237 request['pmsgIn']['V1']['PrefixTable']['pPrefixEntry'] = NULL 238 239 dsName = drsuapi.PDSNAME() 240 dsName['SidLen'] = 0 241 dsName['Guid'] = drsuapi.NULLGUID 242 dsName['Sid'] = '' 243 name = 'DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 244 245 dsName['NameLen'] = len(name) 246 dsName['StringName'] = (name + '\x00') 247 dsName['structLen'] = len(dsName.getDataReferent())-4 248 249 request['pmsgIn']['V1']['rpNames'].append(dsName) 250 251 resp = dce.request(request) 252 resp.dump() 253 254 def test_DRSGetNCChanges(self): 255 # Not yet working 256 dce, rpctransport, hDrs, DsaObjDest = self.connect() 257 258 request = drsuapi.DRSGetNCChanges() 259 request['hDrs'] = hDrs 260 request['dwInVersion'] = 8 261 262 request['pmsgIn']['tag'] = 8 263 request['pmsgIn']['V8']['uuidDsaObjDest'] = DsaObjDest 264 request['pmsgIn']['V8']['uuidInvocIdSrc'] = DsaObjDest 265 #request['pmsgIn']['V8']['pNC'] = NULL 266 267 dsName = drsuapi.DSNAME() 268 dsName['SidLen'] = 0 269 dsName['Guid'] = drsuapi.NULLGUID 270 dsName['Sid'] = '' 271 name = 'DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 272 dsName['NameLen'] = len(name) 273 dsName['StringName'] = (name + '\x00') 274 275 dsName['structLen'] = len(dsName.getData()) 276 277 request['pmsgIn']['V8']['pNC'] = dsName 278 279 request['pmsgIn']['V8']['usnvecFrom']['usnHighObjUpdate'] = 0 280 request['pmsgIn']['V8']['usnvecFrom']['usnHighPropUpdate'] = 0 281 282 request['pmsgIn']['V8']['pUpToDateVecDest'] = NULL 283 284 request['pmsgIn']['V8']['ulFlags'] = drsuapi.DRS_INIT_SYNC | drsuapi.DRS_PER_SYNC #| drsuapi.DRS_CRITICAL_ONLY 285 request['pmsgIn']['V8']['cMaxObjects'] = 100 286 request['pmsgIn']['V8']['cMaxBytes'] = 0 287 request['pmsgIn']['V8']['ulExtendedOp'] = drsuapi.EXOP_REPL_OBJ | drsuapi.EXOP_REPL_SECRETS 288 289 prefixTable = [] 290 oid1 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.656') # principalName 291 oid2 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.221') #'sAMAccountName' 292 oid3 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.90') # 'unicodePwd' 293 oid4 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.94') # ntPwdHistory 294 oid5 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.160') # lmPwdHistory 295 oid6 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.125') # supplementalCreds 296 oid7 = drsuapi.MakeAttid(prefixTable, '1.2.840.113556.1.4.146') # objectSid 297 298 request['pmsgIn']['V8']['pPartialAttrSet']['dwVersion'] = 1 299 request['pmsgIn']['V8']['pPartialAttrSet']['cAttrs'] = 7 300 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid1) 301 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid2) 302 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid3) 303 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid4) 304 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid5) 305 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid6) 306 request['pmsgIn']['V8']['pPartialAttrSet']['rgPartialAttr'].append(oid7) 307 request['pmsgIn']['V8']['pPartialAttrSetEx1'] = NULL 308 request['pmsgIn']['V8']['PrefixTableDest']['PrefixCount'] = len(prefixTable) 309 request['pmsgIn']['V8']['PrefixTableDest']['pPrefixEntry'] = prefixTable 310 311 resp = dce.request(request) 312 resp.dump() 313 314 #moreData = 1 315 #while moreData > 0: 316 # thisObject = resp['pmsgOut']['V6']['pObjects'] 317 # done = False 318 # while not done: 319 # nextObject = thisObject['pNextEntInf'] 320 # thisObject['pNextEntInf'] = NULL 321 # thisObject.dump() 322 # print thisObject['Entinf']['pName']['StringName'] 323 # thisObject = nextObject 324 # if nextObject is '': 325 # done = True 326 # request['pmsgIn']['V8']['uuidInvocIdSrc'] = resp['pmsgOut']['V6']['uuidInvocIdSrc'] 327 # request['pmsgIn']['V8']['usnvecFrom'] = resp['pmsgOut']['V6']['usnvecTo'] 328 # resp = dce.request(request) 329 # moreData = resp['pmsgOut']['V6']['fMoreData'] 330 #print "OBJECTS ", resp['pmsgOut']['V6']['cNumObjects'] 331 332 def getMoreData(self, dce, request, resp): 333 while resp['pmsgOut']['V6']['fMoreData'] > 0: 334 #thisObject = resp['pmsgOut']['V6']['pObjects'] 335 #done = False 336 #while not done: 337 # nextObject = thisObject['pNextEntInf'] 338 # thisObject['pNextEntInf'] = NULL 339 #thisObject.dump() 340 #print '\n' 341 #print thisObject['Entinf']['pName']['StringName'] 342 # thisObject = nextObject 343 # if nextObject is '': 344 # done = True 345 346 request['pmsgIn']['V10']['uuidInvocIdSrc'] = resp['pmsgOut']['V6'] 347 request['pmsgIn']['V10']['usnvecFrom'] = resp['pmsgOut']['V6']['usnvecTo'] 348 resp = dce.request(request) 349 resp.dump() 350 print '\n' 351 352 353 def test_DRSGetNCChanges2(self): 354 # Not yet working 355 dce, rpctransport, hDrs, DsaObjDest = self.connect() 356 357 request = drsuapi.DRSGetNCChanges() 358 request['hDrs'] = hDrs 359 request['dwInVersion'] = 10 360 361 request['pmsgIn']['tag'] =10 362 request['pmsgIn']['V10']['uuidDsaObjDest'] = DsaObjDest 363 request['pmsgIn']['V10']['uuidInvocIdSrc'] = drsuapi.NULLGUID 364 #request['pmsgIn']['V10']['pNC'] = NULL 365 366 dsName = drsuapi.DSNAME() 367 dsName['SidLen'] = 0 368 dsName['Guid'] = drsuapi.NULLGUID 369 dsName['Sid'] = '' 370 371 name = 'CN=Schema,CN=Configuration,DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 372 dsName['NameLen'] = len(name) 373 dsName['StringName'] = (name + '\x00') 374 375 dsName['structLen'] = len(dsName.getData()) 376 377 request['pmsgIn']['V10']['pNC'] = dsName 378 379 request['pmsgIn']['V10']['usnvecFrom']['usnHighObjUpdate'] = 0 380 request['pmsgIn']['V10']['usnvecFrom']['usnHighPropUpdate'] = 0 381 382 request['pmsgIn']['V10']['pUpToDateVecDest'] = NULL 383 384 request['pmsgIn']['V10']['ulFlags'] = drsuapi.DRS_INIT_SYNC | drsuapi.DRS_PER_SYNC | drsuapi.DRS_WRIT_REP | drsuapi.DRS_FULL_SYNC_NOW 385 request['pmsgIn']['V10']['cMaxObjects'] = 100 386 request['pmsgIn']['V10']['cMaxBytes'] = 0 387 request['pmsgIn']['V10']['ulExtendedOp'] = 0 388 request['pmsgIn']['V10']['pPartialAttrSet'] = NULL 389 request['pmsgIn']['V10']['pPartialAttrSetEx1'] = NULL 390 request['pmsgIn']['V10']['PrefixTableDest']['pPrefixEntry'] = NULL 391 #request['pmsgIn']['V10']['ulMoreFlags'] = 0 392 resp = dce.request(request) 393 print resp['pmsgOut']['V6']['pNC']['StringName'] 394 resp.dump() 395 print '\n' 396 self.getMoreData(dce, request, resp) 397 398 dsName = drsuapi.DSNAME(isNDR64=request._isNDR64) 399 dsName['SidLen'] = 0 400 dsName['Guid'] = drsuapi.NULLGUID 401 dsName['Sid'] = '' 402 403 name = 'DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 404 dsName['NameLen'] = len(name) 405 dsName['StringName'] = (name + '\x00') 406 407 dsName['structLen'] = len(dsName.getData()) 408 409 request['pmsgIn']['V10']['pNC'] = dsName 410 resp = dce.request(request) 411 print resp['pmsgOut']['V6']['pNC']['StringName'] 412 resp.dump() 413 print '\n' 414 self.getMoreData(dce, request, resp) 415 416 dsName = drsuapi.DSNAME(isNDR64=request._isNDR64) 417 dsName['SidLen'] = 0 418 dsName['Guid'] = drsuapi.NULLGUID 419 dsName['Sid'] = '' 420 421 name = 'CN=Configuration,DC=%s,DC=%s' % (self.domain.split('.')[0],self.domain.split('.')[1]) 422 dsName['NameLen'] = len(name) 423 dsName['StringName'] = (name + '\x00') 424 425 dsName['structLen'] = len(dsName.getData()) 426 427 request['pmsgIn']['V10']['pNC'] = dsName 428 resp = dce.request(request) 429 print resp['pmsgOut']['V6']['pNC']['StringName'] 430 resp.dump() 431 print '\n' 432 self.getMoreData(dce, request, resp) 433 434 #while resp['pmsgOut']['V6']['fMoreData'] > 0: 435 # thisObject = resp['pmsgOut']['V6']['pObjects'] 436 # done = False 437 # while not done: 438 # nextObject = thisObject['pNextEntInf'] 439 # thisObject['pNextEntInf'] = NULL 440 # #thisObject.dump() 441 # #print '\n' 442 # #print thisObject['Entinf']['pName']['StringName'] 443 # thisObject = nextObject 444 # if nextObject is '': 445 # done = True 446# 447# print "B"*80 448# request['pmsgIn']['V10']['uuidInvocIdSrc'] = resp['pmsgOut']['V6'] 449# request['pmsgIn']['V10']['usnvecFrom'] = resp['pmsgOut']['V6']['usnvecTo'] 450# resp = dce.request(request) 451 452 453class SMBTransport(DRSRTests): 454 def setUp(self): 455 DRSRTests.setUp(self) 456 configFile = ConfigParser.ConfigParser() 457 configFile.read('dcetests.cfg') 458 self.username = configFile.get('SMBTransport', 'username') 459 self.domain = configFile.get('SMBTransport', 'domain') 460 self.serverName = configFile.get('SMBTransport', 'servername') 461 self.password = configFile.get('SMBTransport', 'password') 462 self.machine = configFile.get('SMBTransport', 'machine') 463 self.hashes = configFile.get('SMBTransport', 'hashes') 464 self.stringBinding = r'ncacn_np:%s[\PIPE\lsass]' % self.machine 465 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 466 467class SMBTransport64(DRSRTests): 468 def setUp(self): 469 DRSRTests.setUp(self) 470 configFile = ConfigParser.ConfigParser() 471 configFile.read('dcetests.cfg') 472 self.username = configFile.get('SMBTransport', 'username') 473 self.domain = configFile.get('SMBTransport', 'domain') 474 self.serverName = configFile.get('SMBTransport', 'servername') 475 self.password = configFile.get('SMBTransport', 'password') 476 self.machine = configFile.get('SMBTransport', 'machine') 477 self.hashes = configFile.get('SMBTransport', 'hashes') 478 479 self.stringBinding = r'ncacn_np:%s[\PIPE\lsass]' % self.machine 480 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 481 482class TCPTransport(DRSRTests): 483 def setUp(self): 484 DRSRTests.setUp(self) 485 configFile = ConfigParser.ConfigParser() 486 configFile.read('dcetests.cfg') 487 self.username = configFile.get('TCPTransport', 'username') 488 self.domain = configFile.get('TCPTransport', 'domain') 489 self.serverName = configFile.get('TCPTransport', 'servername') 490 self.password = configFile.get('TCPTransport', 'password') 491 self.machine = configFile.get('TCPTransport', 'machine') 492 self.hashes = configFile.get('TCPTransport', 'hashes') 493 self.stringBinding = epm.hept_map(self.machine, drsuapi.MSRPC_UUID_DRSUAPI, protocol = 'ncacn_ip_tcp') 494 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 495 496class TCPTransport64(DRSRTests): 497 def setUp(self): 498 DRSRTests.setUp(self) 499 configFile = ConfigParser.ConfigParser() 500 configFile.read('dcetests.cfg') 501 self.username = configFile.get('TCPTransport', 'username') 502 self.domain = configFile.get('TCPTransport', 'domain') 503 self.serverName = configFile.get('TCPTransport', 'servername') 504 self.password = configFile.get('TCPTransport', 'password') 505 self.machine = configFile.get('TCPTransport', 'machine') 506 self.hashes = configFile.get('TCPTransport', 'hashes') 507 self.stringBinding = epm.hept_map(self.machine, drsuapi.MSRPC_UUID_DRSUAPI, protocol = 'ncacn_ip_tcp') 508 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 509 510 511# Process command-line arguments. 512if __name__ == '__main__': 513 import sys 514 if len(sys.argv) > 1: 515 testcase = sys.argv[1] 516 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 517 else: 518 #suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport) 519 suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport) 520 #suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64)) 521 #suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64)) 522 unittest.TextTestRunner(verbosity=1).run(suite) 523