1import ConfigParser
2import logging
3import os
4import unittest
5
6from impacket.examples.secretsdump import LocalOperations, RemoteOperations, SAMHashes, LSASecrets, NTDSHashes
7from impacket.smbconnection import SMBConnection
8
9def _print_helper(*args, **kwargs):
10    try:
11        print args[-1]
12    except UnicodeError:
13        pass
14
15class DumpSecrets:
16    def __init__(self, remoteName, username='', password='', domain='', options=None):
17        self.__useVSSMethod = options.use_vss
18        self.__remoteName = remoteName
19        self.__remoteHost = options.target_ip
20        self.__username = username
21        self.__password = password
22        self.__domain = domain
23        self.__lmhash = ''
24        self.__nthash = ''
25        self.__aesKey = options.aesKey
26        self.__smbConnection = None
27        self.__remoteOps = None
28        self.__SAMHashes = None
29        self.__NTDSHashes = None
30        self.__LSASecrets = None
31        self.__systemHive = options.system
32        self.__bootkey = options.bootkey
33        self.__securityHive = options.security
34        self.__samHive = options.sam
35        self.__ntdsFile = options.ntds
36        self.__history = options.history
37        self.__noLMHash = True
38        self.__isRemote = True
39        self.__outputFileName = options.outputfile
40        self.__doKerberos = options.k
41        self.__justDC = options.just_dc
42        self.__justDCNTLM = options.just_dc_ntlm
43        self.__justUser = options.just_dc_user
44        self.__pwdLastSet = options.pwd_last_set
45        self.__printUserStatus= options.user_status
46        self.__resumeFileName = options.resumefile
47        self.__canProcessSAMLSA = True
48        self.__kdcHost = options.dc_ip
49        self.__options = options
50
51        if options.hashes is not None:
52            self.__lmhash, self.__nthash = options.hashes.split(':')
53
54    def connect(self):
55        self.__smbConnection = SMBConnection(self.__remoteName, self.__remoteHost)
56        if self.__doKerberos:
57            self.__smbConnection.kerberosLogin(self.__username, self.__password, self.__domain, self.__lmhash,
58                                               self.__nthash, self.__aesKey, self.__kdcHost)
59        else:
60            self.__smbConnection.login(self.__username, self.__password, self.__domain, self.__lmhash, self.__nthash)
61
62    def dump(self):
63        try:
64            if self.__remoteName.upper() == 'LOCAL' and self.__username == '':
65                self.__isRemote = False
66                self.__useVSSMethod = True
67                if self.__systemHive:
68                    localOperations = LocalOperations(self.__systemHive)
69                    bootKey = localOperations.getBootKey()
70                    if self.__ntdsFile is not None:
71                    # Let's grab target's configuration about LM Hashes storage
72                        self.__noLMHash = localOperations.checkNoLMHashPolicy()
73                else:
74                    import binascii
75                    bootKey = binascii.unhexlify(self.__bootkey)
76
77            else:
78                self.__isRemote = True
79                bootKey = None
80                try:
81                    try:
82                        self.connect()
83                    except Exception, e:
84                        if os.getenv('KRB5CCNAME') is not None and self.__doKerberos is True:
85                            # SMBConnection failed. That might be because there was no way to log into the
86                            # target system. We just have a last resort. Hope we have tickets cached and that they
87                            # will work
88                            logging.debug('SMBConnection didn\'t work, hoping Kerberos will help (%s)' % str(e))
89                            pass
90                        else:
91                            raise
92
93                    self.__remoteOps  = RemoteOperations(self.__smbConnection, self.__doKerberos, self.__kdcHost)
94                    self.__remoteOps.setExecMethod(self.__options.exec_method)
95                    if self.__justDC is False and self.__justDCNTLM is False or self.__useVSSMethod is True:
96                        self.__remoteOps.enableRegistry()
97                        bootKey             = self.__remoteOps.getBootKey()
98                        # Let's check whether target system stores LM Hashes
99                        self.__noLMHash = self.__remoteOps.checkNoLMHashPolicy()
100                except Exception, e:
101                    self.__canProcessSAMLSA = False
102                    if str(e).find('STATUS_USER_SESSION_DELETED') and os.getenv('KRB5CCNAME') is not None \
103                        and self.__doKerberos is True:
104                        # Giving some hints here when SPN target name validation is set to something different to Off
105                        # This will prevent establishing SMB connections using TGS for SPNs different to cifs/
106                        logging.error('Policy SPN target name validation might be restricting full DRSUAPI dump. Try -just-dc-user')
107                    else:
108                        logging.error('RemoteOperations failed: %s' % str(e))
109
110            # If RemoteOperations succeeded, then we can extract SAM and LSA
111            if self.__justDC is False and self.__justDCNTLM is False and self.__canProcessSAMLSA:
112                try:
113                    if self.__isRemote is True:
114                        SAMFileName         = self.__remoteOps.saveSAM()
115                    else:
116                        SAMFileName         = self.__samHive
117
118                    self.__SAMHashes    = SAMHashes(SAMFileName, bootKey, isRemote = self.__isRemote, perSecretCallback=_print_helper)
119                    self.__SAMHashes.dump()
120                    if self.__outputFileName is not None:
121                        self.__SAMHashes.export(self.__outputFileName)
122                except Exception, e:
123                    logging.error('SAM hashes extraction failed: %s' % str(e))
124
125                try:
126                    if self.__isRemote is True:
127                        SECURITYFileName = self.__remoteOps.saveSECURITY()
128                    else:
129                        SECURITYFileName = self.__securityHive
130
131                    self.__LSASecrets = LSASecrets(SECURITYFileName, bootKey, self.__remoteOps,
132                                                   isRemote=self.__isRemote, history=self.__history, perSecretCallback=_print_helper)
133                    self.__LSASecrets.dumpCachedHashes()
134                    if self.__outputFileName is not None:
135                        self.__LSASecrets.exportCached(self.__outputFileName)
136                    self.__LSASecrets.dumpSecrets()
137                    if self.__outputFileName is not None:
138                        self.__LSASecrets.exportSecrets(self.__outputFileName)
139                except Exception, e:
140                    if logging.getLogger().level == logging.DEBUG:
141                        import traceback
142                        traceback.print_exc()
143                    logging.error('LSA hashes extraction failed: %s' % str(e))
144
145            # NTDS Extraction we can try regardless of RemoteOperations failing. It might still work
146            if self.__isRemote is True:
147                if self.__useVSSMethod and self.__remoteOps is not None:
148                    NTDSFileName = self.__remoteOps.saveNTDS()
149                else:
150                    NTDSFileName = None
151            else:
152                NTDSFileName = self.__ntdsFile
153
154            self.__NTDSHashes = NTDSHashes(NTDSFileName, bootKey, isRemote=self.__isRemote, history=self.__history,
155                                           noLMHash=self.__noLMHash, remoteOps=self.__remoteOps,
156                                           useVSSMethod=self.__useVSSMethod, justNTLM=self.__justDCNTLM,
157                                           pwdLastSet=self.__pwdLastSet, resumeSession=self.__resumeFileName,
158                                           outputFileName=self.__outputFileName, justUser=self.__justUser,
159                                           printUserStatus= self.__printUserStatus, perSecretCallback=_print_helper)
160            try:
161                self.__NTDSHashes.dump()
162            except Exception, e:
163                if logging.getLogger().level == logging.DEBUG:
164                    import traceback
165                    traceback.print_exc()
166                if str(e).find('ERROR_DS_DRA_BAD_DN') >= 0:
167                    # We don't store the resume file if this error happened, since this error is related to lack
168                    # of enough privileges to access DRSUAPI.
169                    resumeFile = self.__NTDSHashes.getResumeSessionFile()
170                    if resumeFile is not None:
171                        os.unlink(resumeFile)
172                logging.error(e)
173                if self.__justUser and str(e).find("ERROR_DS_NAME_ERROR_NOT_UNIQUE") >=0:
174                    logging.info("You just got that error because there might be some duplicates of the same name. "
175                                 "Try specifying the domain name for the user as well. It is important to specify it "
176                                 "in the form of NetBIOS domain name/user (e.g. contoso/Administratror).")
177                elif self.__useVSSMethod is False:
178                    logging.info('Something wen\'t wrong with the DRSUAPI approach. Try again with -use-vss parameter')
179            self.cleanup()
180        except (Exception, KeyboardInterrupt), e:
181            if logging.getLogger().level == logging.DEBUG:
182                import traceback
183                traceback.print_exc()
184            logging.error(e)
185            if self.__NTDSHashes is not None:
186                if isinstance(e, KeyboardInterrupt):
187                    while True:
188                        answer =  raw_input("Delete resume session file? [y/N] ")
189                        if answer.upper() == '':
190                            answer = 'N'
191                            break
192                        elif answer.upper() == 'Y':
193                            answer = 'Y'
194                            break
195                        elif answer.upper() == 'N':
196                            answer = 'N'
197                            break
198                    if answer == 'Y':
199                        resumeFile = self.__NTDSHashes.getResumeSessionFile()
200                        if resumeFile is not None:
201                            os.unlink(resumeFile)
202            try:
203                self.cleanup()
204            except:
205                pass
206
207    def cleanup(self):
208        logging.info('Cleaning up... ')
209        if self.__remoteOps:
210            self.__remoteOps.finish()
211        if self.__SAMHashes:
212            self.__SAMHashes.finish()
213        if self.__LSASecrets:
214            self.__LSASecrets.finish()
215        if self.__NTDSHashes:
216            self.__NTDSHashes.finish()
217
218class Options(object):
219    aesKey=None
220    bootkey=None
221    dc_ip=None
222    debug=False
223    exec_method='smbexec'
224    hashes=None
225    history=False
226    just_dc=False
227    just_dc_ntlm=False
228    just_dc_user=None
229    k=False
230    no_pass=False
231    ntds=None
232    outputfile=None
233    pwd_last_set=False
234    resumefile=None
235    sam=None
236    security=None
237    system=None
238    target=''
239    target_ip=''
240    use_vss=False
241    user_status=False
242
243class SecretsDumpTests(unittest.TestCase):
244    def test_VSS_History(self):
245        options = Options()
246        options.target_ip = self.machine
247        options.use_vss = True
248        options.history = True
249        dumper = DumpSecrets(self.serverName, self.username, self.password, self.domain, options)
250        dumper.dump()
251
252    def test_VSS_WMI(self):
253        options = Options()
254        options.target_ip = self.machine
255        options.use_vss = True
256        options.exec_method='wmiexec'
257        dumper = DumpSecrets(self.serverName, self.username, self.password, self.domain, options)
258        dumper.dump()
259
260    def test_DRSUAPI_DC_USER(self):
261        options = Options()
262        options.target_ip = self.machine
263        options.use_vss = False
264        options.just_dc = True
265        options.just_dc_user = '%s/%s' % (self.domain.split('.')[0], 'Administrator')
266        dumper = DumpSecrets(self.serverName, self.username, self.password, self.domain, options)
267        dumper.dump()
268
269    def test_VSS_MMC(self):
270        options = Options()
271        options.target_ip = self.machine
272        options.use_vss = True
273        options.exec_method='mmcexec'
274        dumper = DumpSecrets(self.serverName, self.username, self.password, self.domain, options)
275        dumper.dump()
276
277    def test_DRSUAPI(self):
278        options = Options()
279        options.target_ip = self.machine
280        options.use_vss = False
281        dumper = DumpSecrets(self.serverName, self.username, self.password, self.domain, options)
282        dumper.dump()
283
284
285
286class Tests(SecretsDumpTests):
287    def setUp(self):
288        SecretsDumpTests.setUp(self)
289        # Put specific configuration for target machine with SMB1
290        configFile = ConfigParser.ConfigParser()
291        configFile.read('dcetests.cfg')
292        self.username = configFile.get('SMBTransport', 'username')
293        self.domain   = configFile.get('SMBTransport', 'domain')
294        self.serverName = configFile.get('SMBTransport', 'servername')
295        self.password = configFile.get('SMBTransport', 'password')
296        self.machine  = configFile.get('SMBTransport', 'machine')
297        self.hashes   = configFile.get('SMBTransport', 'hashes')
298        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
299
300if __name__ == "__main__":
301    suite = unittest.TestLoader().loadTestsFromTestCase(Tests)
302    unittest.TextTestRunner(verbosity=1).run(suite)
303