1###############################################################################
2#  Tested so far:
3#
4#  FWOpenPolicyStore
5#
6#  Not yet:
7#
8# Shouldn't dump errors against a win7
9#
10################################################################################
11
12import unittest
13import ConfigParser
14
15from impacket.ldap import ldap, ldapasn1
16import impacket.ldap.ldaptypes
17from impacket.ldap.ldaptypes import SR_SECURITY_DESCRIPTOR
18
19class LDAPTests(unittest.TestCase):
20    def dummySearch(self, ldapConnection):
21        # Let's do a search just to be sure it's working
22        searchFilter = '(servicePrincipalName=*)'
23
24        resp = ldapConnection.search(searchFilter=searchFilter,
25                                     attributes=['servicePrincipalName', 'sAMAccountName', 'userPrincipalName',
26                                                 'MemberOf', 'pwdLastSet', 'whenCreated'])
27        for item in resp:
28            print item.prettyPrint()
29
30    def test_security_descriptor(self):
31        # Comment by @dirkjanm:
32        # To prevent false negatives in the test case, impacket.ldap.ldaptypes.RECALC_ACL_SIZE should be set to False
33        # in tests, since sometimes Windows has redundant null bytes after an ACE.Stripping those away makes the
34        # ACLs not match at a binary level.
35        impacket.ldap.ldaptypes.RECALC_ACL_SIZE = False
36        ldapConnection=self.connect()
37        searchFilter = '(objectCategory=computer)'
38
39        resp = ldapConnection.search(searchFilter=searchFilter,
40                                     attributes=['nTSecurityDescriptor'])
41        for item in resp:
42            if isinstance(item, ldapasn1.SearchResultEntry) is not True:
43                continue
44            for attribute in item['attributes']:
45                if attribute['type'] == 'nTSecurityDescriptor':
46                    secDesc = str(attribute['vals'][0])
47                    # Converting it so we can use it
48                    sd = SR_SECURITY_DESCRIPTOR()
49                    sd.fromString(secDesc)
50                    sd.dump()
51                    self.assertTrue(secDesc, sd.getData())
52
53
54    def connect(self):
55        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
56        ldapConnection.login(self.username, self.password)
57        return ldapConnection
58
59    def test_sicily(self):
60        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
61        ldapConnection.login(authenticationChoice='sicilyPackageDiscovery')
62
63    def test_sicilyNtlm(self):
64        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
65        ldapConnection.login(user=self.username, password=self.password, domain=self.domain)
66
67        self.dummySearch(ldapConnection)
68
69    def test_kerberosLogin(self):
70        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
71        ldapConnection.kerberosLogin(self.username, self.password, self.domain)
72
73        self.dummySearch(ldapConnection)
74
75    def test_kerberosLoginHashes(self):
76        if len(self.hashes) > 0:
77            lmhash, nthash = self.hashes.split(':')
78        else:
79            lmhash = ''
80            nthash = ''
81        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
82        ldapConnection.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '', None, None)
83
84        self.dummySearch(ldapConnection)
85
86    def test_kerberosLoginKeys(self):
87        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
88        ldapConnection.kerberosLogin(self.username, '', self.domain, '', '', self.aesKey, None, None)
89
90        self.dummySearch(ldapConnection)
91
92    def test_sicilyNtlmHashes(self):
93        if len(self.hashes) > 0:
94            lmhash, nthash = self.hashes.split(':')
95        else:
96            lmhash = ''
97            nthash = ''
98        ldapConnection = ldap.LDAPConnection(self.url, self.baseDN)
99        ldapConnection.login(user=self.username, password=self.password, domain=self.domain, lmhash=lmhash, nthash=nthash )
100
101        self.dummySearch(ldapConnection)
102
103    def test_search(self):
104        ldapConnection = self.connect()
105
106        self.dummySearch(ldapConnection)
107
108class TCPTransport(LDAPTests):
109    def setUp(self):
110        LDAPTests.setUp(self)
111        configFile = ConfigParser.ConfigParser()
112        configFile.read('dcetests.cfg')
113        self.username = configFile.get('TCPTransport', 'username')
114        self.domain   = configFile.get('TCPTransport', 'domain')
115        self.serverName = configFile.get('TCPTransport', 'servername')
116        self.password = configFile.get('TCPTransport', 'password')
117        self.machine  = configFile.get('TCPTransport', 'machine')
118        self.hashes   = configFile.get('TCPTransport', 'hashes')
119        self.aesKey = configFile.get('SMBTransport', 'aesKey128')
120        self.url      = 'ldap://%s' % self.serverName
121        self.baseDN   = 'dc=%s, dc=%s' % (self.domain.split('.')[0],self.domain.split('.')[1] )
122
123class TCPTransportSSL(LDAPTests):
124    def setUp(self):
125        LDAPTests.setUp(self)
126        configFile = ConfigParser.ConfigParser()
127        configFile.read('dcetests.cfg')
128        self.username = configFile.get('TCPTransport', 'username')
129        self.domain = configFile.get('TCPTransport', 'domain')
130        self.serverName = configFile.get('TCPTransport', 'servername')
131        self.password = configFile.get('TCPTransport', 'password')
132        self.machine = configFile.get('TCPTransport', 'machine')
133        self.hashes = configFile.get('TCPTransport', 'hashes')
134        self.aesKey = configFile.get('SMBTransport', 'aesKey128')
135        self.url      = 'ldaps://%s' % self.serverName
136        self.baseDN   = 'dc=%s, dc=%s' % (self.domain.split('.')[0],self.domain.split('.')[1] )
137
138# Process command-line arguments.
139if __name__ == '__main__':
140    import sys
141    if len(sys.argv) > 1:
142        testcase = sys.argv[1]
143        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
144    else:
145        suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport)
146    unittest.TextTestRunner(verbosity=1).run(suite)