1############################################################################### 2# Tested so far: 3# 4# FWOpenPolicyStore 5# 6# Not yet: 7# 8# Shouldn't dump errors against a win7 9# 10################################################################################ 11 12import unittest 13import ConfigParser 14 15from impacket.ldap import ldap, ldapasn1 16import impacket.ldap.ldaptypes 17from impacket.ldap.ldaptypes import SR_SECURITY_DESCRIPTOR 18 19class LDAPTests(unittest.TestCase): 20 def dummySearch(self, ldapConnection): 21 # Let's do a search just to be sure it's working 22 searchFilter = '(servicePrincipalName=*)' 23 24 resp = ldapConnection.search(searchFilter=searchFilter, 25 attributes=['servicePrincipalName', 'sAMAccountName', 'userPrincipalName', 26 'MemberOf', 'pwdLastSet', 'whenCreated']) 27 for item in resp: 28 print item.prettyPrint() 29 30 def test_security_descriptor(self): 31 # Comment by @dirkjanm: 32 # To prevent false negatives in the test case, impacket.ldap.ldaptypes.RECALC_ACL_SIZE should be set to False 33 # in tests, since sometimes Windows has redundant null bytes after an ACE.Stripping those away makes the 34 # ACLs not match at a binary level. 35 impacket.ldap.ldaptypes.RECALC_ACL_SIZE = False 36 ldapConnection=self.connect() 37 searchFilter = '(objectCategory=computer)' 38 39 resp = ldapConnection.search(searchFilter=searchFilter, 40 attributes=['nTSecurityDescriptor']) 41 for item in resp: 42 if isinstance(item, ldapasn1.SearchResultEntry) is not True: 43 continue 44 for attribute in item['attributes']: 45 if attribute['type'] == 'nTSecurityDescriptor': 46 secDesc = str(attribute['vals'][0]) 47 # Converting it so we can use it 48 sd = SR_SECURITY_DESCRIPTOR() 49 sd.fromString(secDesc) 50 sd.dump() 51 self.assertTrue(secDesc, sd.getData()) 52 53 54 def connect(self): 55 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 56 ldapConnection.login(self.username, self.password) 57 return ldapConnection 58 59 def test_sicily(self): 60 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 61 ldapConnection.login(authenticationChoice='sicilyPackageDiscovery') 62 63 def test_sicilyNtlm(self): 64 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 65 ldapConnection.login(user=self.username, password=self.password, domain=self.domain) 66 67 self.dummySearch(ldapConnection) 68 69 def test_kerberosLogin(self): 70 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 71 ldapConnection.kerberosLogin(self.username, self.password, self.domain) 72 73 self.dummySearch(ldapConnection) 74 75 def test_kerberosLoginHashes(self): 76 if len(self.hashes) > 0: 77 lmhash, nthash = self.hashes.split(':') 78 else: 79 lmhash = '' 80 nthash = '' 81 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 82 ldapConnection.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '', None, None) 83 84 self.dummySearch(ldapConnection) 85 86 def test_kerberosLoginKeys(self): 87 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 88 ldapConnection.kerberosLogin(self.username, '', self.domain, '', '', self.aesKey, None, None) 89 90 self.dummySearch(ldapConnection) 91 92 def test_sicilyNtlmHashes(self): 93 if len(self.hashes) > 0: 94 lmhash, nthash = self.hashes.split(':') 95 else: 96 lmhash = '' 97 nthash = '' 98 ldapConnection = ldap.LDAPConnection(self.url, self.baseDN) 99 ldapConnection.login(user=self.username, password=self.password, domain=self.domain, lmhash=lmhash, nthash=nthash ) 100 101 self.dummySearch(ldapConnection) 102 103 def test_search(self): 104 ldapConnection = self.connect() 105 106 self.dummySearch(ldapConnection) 107 108class TCPTransport(LDAPTests): 109 def setUp(self): 110 LDAPTests.setUp(self) 111 configFile = ConfigParser.ConfigParser() 112 configFile.read('dcetests.cfg') 113 self.username = configFile.get('TCPTransport', 'username') 114 self.domain = configFile.get('TCPTransport', 'domain') 115 self.serverName = configFile.get('TCPTransport', 'servername') 116 self.password = configFile.get('TCPTransport', 'password') 117 self.machine = configFile.get('TCPTransport', 'machine') 118 self.hashes = configFile.get('TCPTransport', 'hashes') 119 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 120 self.url = 'ldap://%s' % self.serverName 121 self.baseDN = 'dc=%s, dc=%s' % (self.domain.split('.')[0],self.domain.split('.')[1] ) 122 123class TCPTransportSSL(LDAPTests): 124 def setUp(self): 125 LDAPTests.setUp(self) 126 configFile = ConfigParser.ConfigParser() 127 configFile.read('dcetests.cfg') 128 self.username = configFile.get('TCPTransport', 'username') 129 self.domain = configFile.get('TCPTransport', 'domain') 130 self.serverName = configFile.get('TCPTransport', 'servername') 131 self.password = configFile.get('TCPTransport', 'password') 132 self.machine = configFile.get('TCPTransport', 'machine') 133 self.hashes = configFile.get('TCPTransport', 'hashes') 134 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 135 self.url = 'ldaps://%s' % self.serverName 136 self.baseDN = 'dc=%s, dc=%s' % (self.domain.split('.')[0],self.domain.split('.')[1] ) 137 138# Process command-line arguments. 139if __name__ == '__main__': 140 import sys 141 if len(sys.argv) > 1: 142 testcase = sys.argv[1] 143 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 144 else: 145 suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport) 146 unittest.TextTestRunner(verbosity=1).run(suite)