1###############################################################################
2#  Tested so far:
3#
4#
5#  Not yet:
6#
7#
8# Shouldn't dump errors against a win7
9#
10################################################################################
11
12import unittest
13import ConfigParser
14
15from impacket.dcerpc.v5 import transport
16from impacket.dcerpc.v5 import mimilib, epm
17from impacket.dcerpc.v5.dtypes import NULL, MAXIMUM_ALLOWED, OWNER_SECURITY_INFORMATION
18from impacket.winregistry import hexdump
19from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY
20
21
22class RRPTests(unittest.TestCase):
23    def connect(self):
24        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
25        rpctransport.set_connect_timeout(30000)
26        if len(self.hashes) > 0:
27            lmhash, nthash = self.hashes.split(':')
28        else:
29            lmhash = ''
30            nthash = ''
31        #if hasattr(rpctransport, 'set_credentials'):
32            # This method exists only for selected protocol sequences.
33        #    rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
34        dce = rpctransport.get_dce_rpc()
35        #dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY)
36        dce.connect()
37        dce.bind(mimilib.MSRPC_UUID_MIMIKATZ, transfer_syntax = self.ts)
38        dh = mimilib.MimiDiffeH()
39        blob = mimilib.PUBLICKEYBLOB()
40        blob['y'] = dh.genPublicKey()[::-1]
41        request = mimilib.MimiBind()
42        request['clientPublicKey']['sessionType'] = mimilib.CALG_RC4
43        request['clientPublicKey']['cbPublicKey'] = 144
44        request['clientPublicKey']['pbPublicKey'] = str(blob)
45        resp = dce.request(request)
46        blob = mimilib.PUBLICKEYBLOB(''.join(resp['serverPublicKey']['pbPublicKey']))
47        key = dh.getSharedSecret(''.join(blob['y'])[::-1])
48        pHandle = resp['phMimi']
49
50        return dce, rpctransport, pHandle, key[-16:]
51
52    def test_MimiBind(self):
53        dce, rpctransport, pHandle, key = self.connect()
54        dh = mimilib.MimiDiffeH()
55        print 'Our Public'
56        print '='*80
57        hexdump(dh.genPublicKey())
58
59        blob = mimilib.PUBLICKEYBLOB()
60        blob['y'] = dh.genPublicKey()[::-1]
61        request = mimilib.MimiBind()
62        request['clientPublicKey']['sessionType'] = mimilib.CALG_RC4
63        request['clientPublicKey']['cbPublicKey'] = 144
64        request['clientPublicKey']['pbPublicKey'] = str(blob)
65
66        resp = dce.request(request)
67        blob = mimilib.PUBLICKEYBLOB(''.join(resp['serverPublicKey']['pbPublicKey']))
68        print '='*80
69        print 'Server Public'
70        hexdump(''.join(blob['y']))
71        print '='*80
72        print 'Shared'
73        hexdump(dh.getSharedSecret(''.join(blob['y'])[::-1]))
74        resp.dump()
75
76    def test_MimiCommand(self):
77        dce, rpctransport, pHandle, key = self.connect()
78        from Crypto.Cipher import ARC4
79        cipher = ARC4.new(key[::-1])
80        command = cipher.encrypt('token::whoami\x00'.encode('utf-16le'))
81        #command = cipher.encrypt('sekurlsa::logonPasswords\x00'.encode('utf-16le'))
82        #command = cipher.encrypt('process::imports\x00'.encode('utf-16le'))
83        request = mimilib.MimiCommand()
84        request['phMimi'] = pHandle
85        request['szEncCommand'] = len(command)
86        request['encCommand'] = list(command)
87        resp = dce.request(request)
88        cipherText = ''.join(resp['encResult'])
89        cipher = ARC4.new(key[::-1])
90        plain = cipher.decrypt(cipherText)
91        print '='*80
92        print plain
93        #resp.dump()
94
95    def test_MimiUnBind(self):
96        dce, rpctransport, pHandle, key = self.connect()
97        command = 'token::whoami\x00'
98        request = mimilib.MimiUnbind()
99        request['phMimi'] = pHandle
100        hexdump(str(request))
101        resp = dce.request(request)
102        resp.dump()
103
104class TCPTransport(RRPTests):
105    def setUp(self):
106        RRPTests.setUp(self)
107        configFile = ConfigParser.ConfigParser()
108        configFile.read('dcetests.cfg')
109        self.username = configFile.get('TCPTransport', 'username')
110        self.domain   = configFile.get('TCPTransport', 'domain')
111        self.serverName = configFile.get('TCPTransport', 'servername')
112        self.password = configFile.get('TCPTransport', 'password')
113        self.machine  = configFile.get('TCPTransport', 'machine')
114        self.hashes   = configFile.get('TCPTransport', 'hashes')
115        self.stringBinding = epm.hept_map(self.machine, mimilib.MSRPC_UUID_MIMIKATZ, protocol = 'ncacn_ip_tcp')
116        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
117
118
119# Process command-line arguments.
120if __name__ == '__main__':
121    import sys
122    if len(sys.argv) > 1:
123        testcase = sys.argv[1]
124        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
125    else:
126        suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport)
127        #suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64))
128    unittest.TextTestRunner(verbosity=1).run(suite)
129