1###############################################################################
2#  Tested so far:
3#
4# Since DCOM is more high level, I'll always use the helper classes
5# ServerAlive
6# ServerAlive2
7# ComplexPing
8# SimplePing
9# RemoteCreateInstance
10# ResolveOxid
11# ResolveOxid2
12# RemoteActivation
13# RemRelease
14# RemoteGetClassObject
15#
16#  Not yet:
17#
18#
19# Shouldn't dump errors against a win7
20#
21################################################################################
22
23import unittest
24import ConfigParser
25
26from impacket.dcerpc.v5 import transport
27from impacket.dcerpc.v5 import dcomrt
28from impacket.dcerpc.v5.dcom import scmp, vds, oaut, comev
29from impacket.uuid import string_to_bin, uuidtup_to_bin
30from impacket import ntlm
31
32
33class DCOMTests(unittest.TestCase):
34    def connect(self):
35        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
36        if len(self.hashes) > 0:
37            lmhash, nthash = self.hashes.split(':')
38        else:
39            lmhash = ''
40            nthash = ''
41        if hasattr(rpctransport, 'set_credentials'):
42            # This method exists only for selected protocol sequences.
43            rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
44        dce = rpctransport.get_dce_rpc()
45        dce.set_auth_level(ntlm.NTLM_AUTH_PKT_INTEGRITY)
46        dce.connect()
47
48        return dce, rpctransport
49
50    def test_ServerAlive(self):
51        dce, rpctransport = self.connect()
52        objExporter = dcomrt.IObjectExporter(dce)
53        resp = objExporter.ServerAlive()
54        #resp.dump()
55
56    def test_ServerAlive2(self):
57        dce, rpctransport = self.connect()
58        objExporter = dcomrt.IObjectExporter(dce)
59        stringBindings = objExporter.ServerAlive2()
60        #for binding in stringBindings:
61        #    binding.dump()
62
63    def test_ComplexPing_SimplePing(self):
64        dce, rpctransport = self.connect()
65        objExporter = dcomrt.IObjectExporter(dce)
66        resp = objExporter.ComplexPing()
67        resp = objExporter.SimplePing(resp['pSetId'])
68
69    def test_ResolveOxid(self):
70        dce, rpctransport = self.connect()
71        scm = dcomrt.IRemoteSCMActivator(dce)
72        iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem)
73        objExporter = dcomrt.IObjectExporter(dce)
74        stringBindings = objExporter.ResolveOxid(iInterface.get_oxid(), (7,))
75        #for binding in stringBindings:
76        #    binding.dump()
77
78    def test_ResolveOxid2(self):
79        dce, rpctransport = self.connect()
80        #scm = dcomrt.IRemoteSCMActivator(dce)
81        #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem)
82        scm = dcomrt.IActivation(dce)
83        iInterface = scm.RemoteActivation(comev.CLSID_EventSystem, comev.IID_IEventSystem)
84        objExporter = dcomrt.IObjectExporter(dce)
85        stringBindings = objExporter.ResolveOxid2(iInterface.get_oxid(), (7,))
86        #for binding in stringBindings:
87        #    binding.dump()
88
89    def test_RemoteActivation(self):
90        dce, rpctransport = self.connect()
91        scm = dcomrt.IActivation(dce)
92        iInterface = scm.RemoteActivation(comev.CLSID_EventSystem, comev.IID_IEventSystem)
93
94    def test_RemoteGetClassObject(self):
95        dce, rpctransport = self.connect()
96        IID_IClassFactory = uuidtup_to_bin(('00000001-0000-0000-C000-000000000046','0.0'))
97        scm = dcomrt.IRemoteSCMActivator(dce)
98        iInterface = scm.RemoteGetClassObject(comev.CLSID_EventSystem, IID_IClassFactory)
99        iInterface.RemRelease()
100
101
102    def test_RemQueryInterface(self):
103        dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain)
104        iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem)
105        iEventSystem = comev.IEventSystem(iInterface)
106        iEventSystem.RemQueryInterface(1, (comev.IID_IEventSystem,))
107        dcom.disconnect()
108
109    def test_RemRelease(self):
110        dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain)
111        iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem)
112        iEventSystem = comev.IEventSystem(iInterface)
113        iEventSystem.RemRelease()
114        dcom.disconnect()
115
116    def test_RemoteCreateInstance(self):
117        dce, rpctransport = self.connect()
118
119        scm = dcomrt.IRemoteSCMActivator(dce)
120        iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem)
121
122    def tes_scmp(self):
123        dce, rpctransport = self.connect()
124
125        scm = dcomrt.IRemoteSCMActivator(dce)
126        iInterface = scm.RemoteCreateInstance(scmp.CLSID_ShadowCopyProvider, scmp.IID_IVssSnapshotMgmt)
127        iVssSnapshotMgmt = scmp.IVssSnapshotMgmt(iInterface)
128        #iVssSnapshotMgmt.RemRelease()
129
130        iVssEnumMgmtObject = iVssSnapshotMgmt.QueryVolumesSupportedForSnapshots(scmp.IID_ShadowCopyProvider, 31)
131        resp = iVssEnumMgmtObject.Next(10)
132        #iVssEnumObject = iVssSnapshotMgmt.QuerySnapshotsByVolume('C:\x00')
133
134        #iProviderMgmtInterface = iVssSnapshotMgmt.GetProviderMgmtInterface()
135        #enumObject =iProviderMgmtInterface.QueryDiffAreasOnVolume('C:\x00')
136        #iVssSnapshotMgmt.RemQueryInterface(1, (scmp.IID_IVssEnumMgmtObject,))
137        #iVssSnapshotMgmt.RemAddRef()
138        #iVssSnapshotMgmt = dcom.hRemoteCreateInstance(dce, scmp.CLSID_ShadowCopyProvider, dcom.IID_IRemUnknown)
139
140        #iVssEnumMgmtObject.RemQueryInterface(1, (scmp.IID_IVssEnumMgmtObject,))
141
142    def tes_vds(self):
143        dce, rpctransport = self.connect()
144
145        #objExporter = dcom.IObjectExporter(dce)
146        #objExporter.ComplexPing()
147        #objExporter.ComplexPing()
148
149        scm = dcomrt.IRemoteSCMActivator(dce)
150        iInterface = scm.RemoteCreateInstance(vds.CLSID_VirtualDiskService, vds.IID_IVdsServiceInitialization)
151        serviceInitialization = vds.IVdsServiceInitialization(iInterface)
152        serviceInitialization.Initialize()
153
154        iInterface = serviceInitialization.RemQueryInterface(1, (vds.IID_IVdsService,))
155        vdsService = vds.IVdsService(iInterface)
156
157        resp = vdsService.IsServiceReady()
158        while resp['ErrorCode'] == 1:
159            print "Waiting.. "
160            resp = vdsService.IsServiceReady()
161
162        vdsService.WaitForServiceReady()
163        vdsService.GetProperties()
164        enumObject = vdsService.QueryProviders(1)
165        interfaces = enumObject.Next(1)
166        iii = interfaces[0].RemQueryInterface(1, (vds.IID_IVdsProvider,))
167        provider = vds.IVdsProvider(iii)
168        resp = provider.GetProperties()
169        resp.dump()
170
171    def tes_oaut(self):
172        dce, rpctransport = self.connect()
173        IID_IDispatch = string_to_bin('00020400-0000-0000-C000-000000000046')
174        IID_ITypeInfo = string_to_bin('00020401-0000-0000-C000-000000000046')
175        scm = dcomrt.IRemoteSCMActivator(dce)
176        iInterface = scm.RemoteCreateInstance(string_to_bin('4E14FBA2-2E22-11D1-9964-00C04FBBB345'), IID_IDispatch)
177        iDispatch = oaut.IDispatch(iInterface)
178        kk = iDispatch.GetTypeInfoCount()
179        kk.dump()
180        iTypeInfo = iDispatch.GetTypeInfo()
181        iTypeInfo.GetTypeAttr()
182
183    def tes_comev(self):
184        if len(self.hashes) > 0:
185            lmhash, nthash = self.hashes.split(':')
186        else:
187            lmhash = ''
188            nthash = ''
189
190        dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain, lmhash, nthash)
191        iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem)
192
193        #scm = dcomrt.IRemoteSCMActivator(dce)
194
195        #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem)
196        #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem,oaut.IID_IDispatch)
197        iDispatch = oaut.IDispatch(iInterface)
198        #scm = dcomrt.IRemoteSCMActivator(dce)
199        #resp = iDispatch.GetIDsOfNames(('Navigate\x00', 'ExecWB\x00'))
200        #resp.dump()
201        iEventSystem = comev.IEventSystem(iInterface)
202        iTypeInfo = iEventSystem.GetTypeInfo()
203        resp = iTypeInfo.GetTypeAttr()
204        #resp.dump()
205        for i in range(1,resp['ppTypeAttr']['cFuncs']):
206            resp = iTypeInfo.GetFuncDesc(i)
207            #resp.dump()
208            resp2 = iTypeInfo.GetNames(resp['ppFuncDesc']['memid'])
209            #resp2.dump()
210            resp = iTypeInfo.GetDocumentation(resp['ppFuncDesc']['memid'])
211            #resp.dump()
212        #iEventSystem.get_EventObjectChangeEventClassID()
213        iEventSystem.RemRelease()
214        iTypeInfo.RemRelease()
215
216        objCollection = iEventSystem.Query('EventSystem.EventSubscriptionCollection', 'ALL')
217
218        resp = objCollection.get_Count()
219        count = resp['pCount']
220
221        evnObj = objCollection.get_NewEnum()
222        #for i in range(count-1):
223        for i in range(3):
224            iUnknown = evnObj.Next(1)[0]
225            es = iUnknown.RemQueryInterface(1, (comev.IID_IEventSubscription3,))
226            es = comev.IEventSubscription3(es)
227
228            #es.get_SubscriptionID()
229            print es.get_SubscriptionName()['pbstrSubscriptionName']['asData']
230            ##es.get_PublisherID()
231            #es.get_EventClassID()
232            #es.get_MethodName()
233            ##es.get_SubscriberCLSID()
234            #es.get_SubscriberInterface()
235            #es.get_PerUser()
236            #es.get_OwnerSID()
237            #es.get_Enabled()
238            ##es.get_Description()
239            ##es.get_MachineName()
240            ##es.GetPublisherProperty()
241            #es.GetPublisherPropertyCollection()
242            ##es.GetSubscriberProperty()
243            #es.GetSubscriberPropertyCollection()
244            #es.get_InterfaceID()
245            es.RemRelease()
246
247
248        objCollection = iEventSystem.Query('EventSystem.EventClassCollection', 'ALL')
249        resp = objCollection.get_Count()
250        count = resp['pCount']
251
252        #objCollection.get_Item('EventClassID={D5978630-5B9F-11D1-8DD2-00AA004ABD5E}')
253        evnObj = objCollection.get_NewEnum()
254        #for i in range(count-1):
255        for i in range(3):
256
257            iUnknown = evnObj.Next(1)[0]
258
259            ev = iUnknown.RemQueryInterface(1, (comev.IID_IEventClass2,))
260            ev = comev.IEventClass2(ev)
261
262            ev.get_EventClassID()
263            #ev.get_EventClassName()
264            #ev.get_OwnerSID()
265            #ev.get_FiringInterfaceID()
266            #ev.get_Description()
267            #try:
268            #    ev.get_TypeLib()
269            #except:
270            #    pass
271
272            #ev.get_PublisherID()
273            #ev.get_MultiInterfacePublisherFilterCLSID()
274            #ev.get_AllowInprocActivation()
275            #ev.get_FireInParallel()
276            ev.RemRelease()
277
278        print "="*80
279
280        dcom.disconnect()
281        #eventSubscription.get_SubscriptionID()
282
283
284    def tes_ie(self):
285        dce, rpctransport = self.connect()
286        scm = dcomrt.IRemoteSCMActivator(dce)
287
288        #iInterface = scm.RemoteCreateInstance(string_to_bin('0002DF01-0000-0000-C000-000000000046'),ie.IID_WebBrowser)
289        iInterface = scm.RemoteCreateInstance(string_to_bin('72C24DD5-D70A-438B-8A42-98424B88AFB8'),dcomrt.IID_IRemUnknown)
290
291        iDispatch = ie.IWebBrowser(iInterface)
292        resp = iDispatch.GetIDsOfNames(('Navigate',))
293        print resp
294        #sys.exit(1)
295        iTypeInfo = iDispatch.GetTypeInfo()
296        resp = iTypeInfo.GetTypeAttr()
297        #resp.dump()
298        for i in range(0,resp['ppTypeAttr']['cFuncs']):
299            resp = iTypeInfo.GetFuncDesc(i)
300            #resp.dump()
301            #resp2 = iTypeInfo.GetNames(resp['ppFuncDesc']['memid'])
302            #print resp2['rgBstrNames'][0]['asData']
303            resp = iTypeInfo.GetDocumentation(resp['ppFuncDesc']['memid'])
304            print resp['pBstrName']['asData']
305        #iEventSystem.get_EventObjectChangeEventClassID()
306        print "ACA"
307        iTypeInfo.RemRelease()
308        iDispatch.RemRelease()
309
310        sys.exit(1)
311
312class TCPTransport(DCOMTests):
313    def setUp(self):
314        DCOMTests.setUp(self)
315        configFile = ConfigParser.ConfigParser()
316        configFile.read('dcetests.cfg')
317        self.username = configFile.get('TCPTransport', 'username')
318        self.domain   = configFile.get('TCPTransport', 'domain')
319        self.serverName = configFile.get('TCPTransport', 'servername')
320        self.password = configFile.get('TCPTransport', 'password')
321        self.machine  = configFile.get('TCPTransport', 'machine')
322        self.hashes   = configFile.get('TCPTransport', 'hashes')
323        self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine
324        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
325
326class TCPTransport64(DCOMTests):
327    def setUp(self):
328        DCOMTests.setUp(self)
329        configFile = ConfigParser.ConfigParser()
330        configFile.read('dcetests.cfg')
331        self.username = configFile.get('TCPTransport', 'username')
332        self.domain   = configFile.get('TCPTransport', 'domain')
333        self.serverName = configFile.get('TCPTransport', 'servername')
334        self.password = configFile.get('TCPTransport', 'password')
335        self.machine  = configFile.get('TCPTransport', 'machine')
336        self.hashes   = configFile.get('TCPTransport', 'hashes')
337        self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine
338        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
339
340
341# Process command-line arguments.
342if __name__ == '__main__':
343    import sys
344    if len(sys.argv) > 1:
345        testcase = sys.argv[1]
346        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
347    else:
348        suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport)
349        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64))
350    unittest.TextTestRunner(verbosity=1).run(suite)
351