1import unittest
2import ConfigParser
3
4from impacket.dcerpc.v5.ndr import NDRCALL
5from impacket.dcerpc.v5 import transport, epm, samr
6from impacket.dcerpc.v5.dtypes import NULL
7from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY, \
8    RPC_C_AUTHN_LEVEL_NONE, RPC_C_AUTHN_GSS_NEGOTIATE, RPC_C_AUTHN_WINNT
9from impacket.dcerpc.v5.dtypes import RPC_UNICODE_STRING
10
11
12# aimed at testing just the DCERPC engine, not the particular
13# endpoints (we should do specific tests for endpoints)
14# here we're using EPM just because we need one, and it's the
15# easiest one
16
17class DCERPCTests(unittest.TestCase):
18    def connectDCE(self, username, password, domain, lm='', nt='', aesKey='', TGT=None, TGS=None, tfragment=0,
19                   dceFragment=0,
20                   auth_type=RPC_C_AUTHN_WINNT, auth_level=RPC_C_AUTHN_LEVEL_NONE, dceAuth=True, doKerberos=False,
21                   bind=epm.MSRPC_UUID_PORTMAP):
22        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
23
24        if hasattr(rpctransport, 'set_credentials'):
25            # This method exists only for selected protocol sequences.
26            rpctransport.set_credentials(username, password, domain, lm, nt, aesKey, TGT, TGS)
27            rpctransport.set_kerberos(doKerberos, kdcHost=self.machine)
28
29        rpctransport.set_max_fragment_size(tfragment)
30        rpctransport.setRemoteName(self.serverName)
31        rpctransport.setRemoteHost(self.machine)
32        dce = rpctransport.get_dce_rpc()
33        dce.set_max_fragment_size(dceFragment)
34        if dceAuth is True:
35            dce.set_credentials(*(rpctransport.get_credentials()))
36        dce.connect()
37        dce.set_auth_type(auth_type)
38        dce.set_auth_level(auth_level)
39        dce.bind(bind)
40
41        return dce
42
43    def test_connection(self):
44        dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=False)
45        dce.disconnect()
46
47    def test_connectionHashes(self):
48        lmhash, nthash = self.hashes.split(':')
49        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=False)
50        dce.disconnect()
51
52    def test_dceAuth(self):
53        dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=True)
54        resp = epm.hept_lookup(self.machine)
55        dce.disconnect()
56
57    def test_dceAuthKerberos(self):
58        dce = self.connectDCE(self.username, self.password, self.domain, dceAuth=True, doKerberos=True)
59        resp = epm.hept_lookup(self.machine)
60        dce.disconnect()
61
62    def test_dceAuthHasHashes(self):
63        lmhash, nthash = self.hashes.split(':')
64        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=True)
65        resp = epm.hept_lookup(self.machine)
66        dce.disconnect()
67
68    def test_dceAuthHasHashesKerberos(self):
69        lmhash, nthash = self.hashes.split(':')
70        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceAuth=True, doKerberos=True)
71        resp = epm.hept_lookup(self.machine)
72        dce.disconnect()
73
74    def test_dceAuthHasAes128Kerberos(self):
75        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128, dceAuth=True, doKerberos=True)
76        resp = epm.hept_lookup(self.machine)
77        dce.disconnect()
78
79    def test_dceAuthHasAes256Kerberos(self):
80        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256, dceAuth=True, doKerberos=True)
81        resp = epm.hept_lookup(self.machine)
82        dce.disconnect()
83
84    def test_dceTransportFragmentation(self):
85        lmhash, nthash = self.hashes.split(':')
86        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, tfragment=1, dceAuth=True, doKerberos=False)
87        request = epm.ept_lookup()
88        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
89        request['object'] = NULL
90        request['Ifid'] = NULL
91        request['vers_option'] = epm.RPC_C_VERS_ALL
92        request['max_ents'] = 499
93        resp = dce.request(request)
94        dce.disconnect()
95
96    def test_dceFragmentation(self):
97        lmhash, nthash = self.hashes.split(':')
98        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1, dceAuth=True, doKerberos=False)
99        request = epm.ept_lookup()
100        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
101        request['object'] = NULL
102        request['Ifid'] = NULL
103        request['vers_option'] = epm.RPC_C_VERS_ALL
104        request['max_ents'] = 499
105        resp = dce.request(request)
106        dce.disconnect()
107
108    def test_bigRequestMustFragment(self):
109        class dummyCall(NDRCALL):
110            opnum = 2
111            structure = (
112                ('Name', RPC_UNICODE_STRING),
113            )
114        lmhash, nthash = self.hashes.split(':')
115        oldBinding = self.stringBinding
116        self.stringBinding = epm.hept_map(self.machine, samr.MSRPC_UUID_SAMR, protocol = 'ncacn_ip_tcp')
117        print self.stringBinding
118        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=0,
119                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
120                              dceAuth=True,
121                              doKerberos=True, bind=samr.MSRPC_UUID_SAMR)
122        self.stringBinding = oldBinding
123
124        request = samr.SamrConnect()
125        request['ServerName'] = u'BETO\x00'
126        request['DesiredAccess'] = samr.DELETE | samr.READ_CONTROL | samr.WRITE_DAC | samr.WRITE_OWNER | samr.ACCESS_SYSTEM_SECURITY | samr.GENERIC_READ | samr.GENERIC_WRITE | samr.GENERIC_EXECUTE | samr.SAM_SERVER_CONNECT | samr.SAM_SERVER_SHUTDOWN | samr.SAM_SERVER_INITIALIZE | samr.SAM_SERVER_CREATE_DOMAIN | samr.SAM_SERVER_ENUMERATE_DOMAINS | samr.SAM_SERVER_LOOKUP_DOMAIN | samr.SAM_SERVER_READ | samr.SAM_SERVER_WRITE | samr.SAM_SERVER_EXECUTE
127        resp = dce.request(request)
128        request = samr.SamrEnumerateDomainsInSamServer()
129        request['ServerHandle'] = resp['ServerHandle']
130        request['EnumerationContext'] =  0
131        request['PreferedMaximumLength'] = 500
132        resp2 = dce.request(request)
133        try:
134            request = samr.SamrLookupDomainInSamServer()
135            request['ServerHandle'] = resp['ServerHandle']
136            request['Name'] = 'A'*4500
137            resp = dce.request(request)
138        except Exception, e:
139            if str(e).find('STATUS_NO_SUCH_DOMAIN') < 0:
140                raise
141        dce.disconnect()
142
143    def test_dceFragmentationWINNTPacketIntegrity(self):
144        lmhash, nthash = self.hashes.split(':')
145        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1,
146                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=False)
147        request = epm.ept_lookup()
148        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
149        request['object'] = NULL
150        request['Ifid'] = NULL
151        request['vers_option'] = epm.RPC_C_VERS_ALL
152        request['max_ents'] = 499
153        resp = dce.request(request)
154        dce.disconnect()
155
156    def test_dceFragmentationWINNTPacketPrivacy(self):
157        lmhash, nthash = self.hashes.split(':')
158        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1,
159                              auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=False)
160        request = epm.ept_lookup()
161        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
162        request['object'] = NULL
163        request['Ifid'] = NULL
164        request['vers_option'] = epm.RPC_C_VERS_ALL
165        request['max_ents'] = 499
166        resp = dce.request(request)
167        dce.disconnect()
168
169    def test_dceFragmentationKerberosPacketIntegrity(self):
170        lmhash, nthash = self.hashes.split(':')
171        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1,
172                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
173                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True)
174        request = epm.ept_lookup()
175        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
176        request['object'] = NULL
177        request['Ifid'] = NULL
178        request['vers_option'] = epm.RPC_C_VERS_ALL
179        request['max_ents'] = 499
180        resp = dce.request(request)
181        dce.disconnect()
182
183    def test_dceFragmentationKerberosPacketPrivacy(self):
184        lmhash, nthash = self.hashes.split(':')
185        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, dceFragment=1,
186                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
187                              auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True)
188        request = epm.ept_lookup()
189        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
190        request['object'] = NULL
191        request['Ifid'] = NULL
192        request['vers_option'] = epm.RPC_C_VERS_ALL
193        request['max_ents'] = 499
194        resp = dce.request(request)
195        dce.disconnect()
196
197    def test_WINNTPacketIntegrity(self):
198        dce = self.connectDCE(self.username, self.password, self.domain, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY,
199                              dceAuth=True, doKerberos=False)
200        request = epm.ept_lookup()
201        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
202        request['object'] = NULL
203        request['Ifid'] = NULL
204        request['vers_option'] = epm.RPC_C_VERS_ALL
205        request['max_ents'] = 499
206        resp = dce.request(request)
207        dce.disconnect()
208
209    def test_KerberosPacketIntegrity(self):
210        dce = self.connectDCE(self.username, self.password, self.domain, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
211                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True)
212        request = epm.ept_lookup()
213        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
214        request['object'] = NULL
215        request['Ifid'] = NULL
216        request['vers_option'] = epm.RPC_C_VERS_ALL
217        request['max_ents'] = 499
218        resp = dce.request(request)
219        resp = dce.request(request)
220        resp.dump()
221        dce.disconnect()
222
223    def test_HashesWINNTPacketIntegrity(self):
224        lmhash, nthash = self.hashes.split(':')
225        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash,
226                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=False)
227        request = epm.ept_lookup()
228        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
229        request['object'] = NULL
230        request['Ifid'] = NULL
231        request['vers_option'] = epm.RPC_C_VERS_ALL
232        request['max_ents'] = 499
233        resp = dce.request(request)
234        dce.disconnect()
235
236    def test_HashesKerberosPacketIntegrity(self):
237        lmhash, nthash = self.hashes.split(':')
238        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
239                              auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, dceAuth=True, doKerberos=True)
240        request = epm.ept_lookup()
241        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
242        request['object'] = NULL
243        request['Ifid'] = NULL
244        request['vers_option'] = epm.RPC_C_VERS_ALL
245        request['max_ents'] = 499
246        resp = dce.request(request)
247        resp = dce.request(request)
248        resp.dump()
249        dce.disconnect()
250
251    def test_Aes128KerberosPacketIntegrity(self):
252        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128,
253                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY,
254                              dceAuth=True, doKerberos=True)
255        request = epm.ept_lookup()
256        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
257        request['object'] = NULL
258        request['Ifid'] = NULL
259        request['vers_option'] = epm.RPC_C_VERS_ALL
260        request['max_ents'] = 499
261        resp = dce.request(request)
262        resp = dce.request(request)
263        resp.dump()
264        dce.disconnect()
265
266    def test_Aes256KerberosPacketIntegrity(self):
267        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256,
268                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY,
269                              dceAuth=True, doKerberos=True)
270        request = epm.ept_lookup()
271        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
272        request['object'] = NULL
273        request['Ifid'] = NULL
274        request['vers_option'] = epm.RPC_C_VERS_ALL
275        request['max_ents'] = 499
276        resp = dce.request(request)
277        resp = dce.request(request)
278        resp.dump()
279        dce.disconnect()
280
281    def test_packetAnonWINNTPacketIntegrity(self):
282        # With SMB Transport this will fail with STATUS_ACCESS_DENIED
283        try:
284            dce = self.connectDCE('', '', '', auth_level=RPC_C_AUTHN_LEVEL_PKT_INTEGRITY,dceAuth=False, doKerberos=False)
285            request = epm.ept_lookup()
286            request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
287            request['object'] = NULL
288            request['Ifid'] = NULL
289            request['vers_option'] = epm.RPC_C_VERS_ALL
290            request['max_ents'] = 499
291            resp = dce.request(request)
292            dce.disconnect()
293        except Exception, e:
294            if not (str(e).find('STATUS_ACCESS_DENIED') >=0 and self.stringBinding.find('ncacn_np') >=0):
295                raise
296
297    def test_WINNTPacketPrivacy(self):
298        dce = self.connectDCE(self.username, self.password, self.domain, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
299                              dceAuth=True, doKerberos=False)
300        request = epm.ept_lookup()
301        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
302        request['object'] = NULL
303        request['Ifid'] = NULL
304        request['vers_option'] = epm.RPC_C_VERS_ALL
305        request['max_ents'] = 499
306        resp = dce.request(request)
307        resp = dce.request(request)
308        dce.disconnect()
309
310    def test_KerberosPacketPrivacy(self):
311        dce = self.connectDCE(self.username, self.password, self.domain, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
312                              auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True)
313        request = epm.ept_lookup()
314        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
315        request['object'] = NULL
316        request['Ifid'] = NULL
317        request['vers_option'] = epm.RPC_C_VERS_ALL
318        request['max_ents'] = 499
319        resp = dce.request(request)
320        resp = dce.request(request)
321        resp.dump()
322        dce.disconnect()
323
324    def test_HashesWINNTPacketPrivacy(self):
325        lmhash, nthash = self.hashes.split(':')
326        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
327                              dceAuth=True, doKerberos=False)
328        request = epm.ept_lookup()
329        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
330        request['object'] = NULL
331        request['Ifid'] = NULL
332        request['vers_option'] = epm.RPC_C_VERS_ALL
333        request['max_ents'] = 499
334        resp = dce.request(request)
335        dce.disconnect()
336
337    def test_HashesKerberosPacketPrivacy(self):
338        lmhash, nthash = self.hashes.split(':')
339        dce = self.connectDCE(self.username, '', self.domain, lmhash, nthash, auth_type=RPC_C_AUTHN_GSS_NEGOTIATE,
340                              auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY, dceAuth=True, doKerberos=True)
341        request = epm.ept_lookup()
342        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
343        request['object'] = NULL
344        request['Ifid'] = NULL
345        request['vers_option'] = epm.RPC_C_VERS_ALL
346        request['max_ents'] = 499
347        resp = dce.request(request)
348        resp = dce.request(request)
349        resp.dump()
350        dce.disconnect()
351
352    def test_Aes128KerberosPacketPrivacy(self):
353        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey128,
354                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
355                              dceAuth=True, doKerberos=True)
356        request = epm.ept_lookup()
357        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
358        request['object'] = NULL
359        request['Ifid'] = NULL
360        request['vers_option'] = epm.RPC_C_VERS_ALL
361        request['max_ents'] = 499
362        resp = dce.request(request)
363        resp = dce.request(request)
364        resp.dump()
365        dce.disconnect()
366
367    def test_Aes256KerberosPacketPrivacy(self):
368        dce = self.connectDCE(self.username, '', self.domain, '', '', self.aesKey256,
369                              auth_type=RPC_C_AUTHN_GSS_NEGOTIATE, auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,
370                              dceAuth=True, doKerberos=True)
371        request = epm.ept_lookup()
372        request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
373        request['object'] = NULL
374        request['Ifid'] = NULL
375        request['vers_option'] = epm.RPC_C_VERS_ALL
376        request['max_ents'] = 499
377        resp = dce.request(request)
378        resp = dce.request(request)
379        resp.dump()
380        dce.disconnect()
381
382    def test_AnonWINNTPacketPrivacy(self):
383        # With SMB Transport this will fail with STATUS_ACCESS_DENIED
384        try:
385            dce = self.connectDCE('', '', '', auth_level=RPC_C_AUTHN_LEVEL_PKT_PRIVACY,dceAuth=False, doKerberos=False)
386            request = epm.ept_lookup()
387            request['inquiry_type'] = epm.RPC_C_EP_ALL_ELTS
388            request['object'] = NULL
389            request['Ifid'] = NULL
390            request['vers_option'] = epm.RPC_C_VERS_ALL
391            request['max_ents'] = 499
392            resp = dce.request(request)
393            dce.disconnect()
394        except Exception, e:
395            if not (str(e).find('STATUS_ACCESS_DENIED') >=0 and self.stringBinding.find('ncacn_np') >=0):
396                raise
397
398class TCPTransport(DCERPCTests):
399    def setUp(self):
400        DCERPCTests.setUp(self)
401        # Put specific configuration for target machine with SMB1
402        configFile = ConfigParser.ConfigParser()
403        configFile.read('dcetests.cfg')
404        self.username = configFile.get('TCPTransport', 'username')
405        self.domain   = configFile.get('TCPTransport', 'domain')
406        self.serverName = configFile.get('TCPTransport', 'servername')
407        self.password = configFile.get('TCPTransport', 'password')
408        self.machine  = configFile.get('TCPTransport', 'machine')
409        self.hashes   = configFile.get('TCPTransport', 'hashes')
410        self.aesKey256= configFile.get('TCPTransport', 'aesKey256')
411        self.aesKey128= configFile.get('TCPTransport', 'aesKey128')
412        self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine
413
414class SMBTransport(DCERPCTests):
415    def setUp(self):
416        # Put specific configuration for target machine with SMB_002
417        DCERPCTests.setUp(self)
418        configFile = ConfigParser.ConfigParser()
419        configFile.read('dcetests.cfg')
420        self.username = configFile.get('SMBTransport', 'username')
421        self.domain   = configFile.get('SMBTransport', 'domain')
422        self.serverName = configFile.get('SMBTransport', 'servername')
423        self.password = configFile.get('SMBTransport', 'password')
424        self.machine  = configFile.get('SMBTransport', 'machine')
425        self.hashes   = configFile.get('SMBTransport', 'hashes')
426        self.aesKey256= configFile.get('SMBTransport', 'aesKey256')
427        self.aesKey128= configFile.get('SMBTransport', 'aesKey128')
428        self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine
429
430if __name__ == "__main__":
431    import sys
432    if len(sys.argv) > 1:
433        testcase = sys.argv[1]
434        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
435    else:
436        suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport)
437        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport))
438    unittest.TextTestRunner(verbosity=1).run(suite)
439