1############################################################################### 2# Tested so far: 3# 4# Since DCOM is more high level, I'll always use the helper classes 5# ServerAlive 6# ServerAlive2 7# ComplexPing 8# SimplePing 9# RemoteCreateInstance 10# ResolveOxid 11# ResolveOxid2 12# RemoteActivation 13# RemRelease 14# RemoteGetClassObject 15# 16# Not yet: 17# 18# 19# Shouldn't dump errors against a win7 20# 21################################################################################ 22 23import unittest 24import ConfigParser 25 26from impacket.dcerpc.v5 import transport 27from impacket.dcerpc.v5 import dcomrt 28from impacket.dcerpc.v5.dcom import scmp, vds, oaut, comev 29from impacket.uuid import string_to_bin, uuidtup_to_bin 30from impacket import ntlm 31 32 33class DCOMTests(unittest.TestCase): 34 def connect(self): 35 rpctransport = transport.DCERPCTransportFactory(self.stringBinding) 36 if len(self.hashes) > 0: 37 lmhash, nthash = self.hashes.split(':') 38 else: 39 lmhash = '' 40 nthash = '' 41 if hasattr(rpctransport, 'set_credentials'): 42 # This method exists only for selected protocol sequences. 43 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 44 dce = rpctransport.get_dce_rpc() 45 dce.set_auth_level(ntlm.NTLM_AUTH_PKT_INTEGRITY) 46 dce.connect() 47 48 return dce, rpctransport 49 50 def test_ServerAlive(self): 51 dce, rpctransport = self.connect() 52 objExporter = dcomrt.IObjectExporter(dce) 53 resp = objExporter.ServerAlive() 54 #resp.dump() 55 56 def test_ServerAlive2(self): 57 dce, rpctransport = self.connect() 58 objExporter = dcomrt.IObjectExporter(dce) 59 stringBindings = objExporter.ServerAlive2() 60 #for binding in stringBindings: 61 # binding.dump() 62 63 def test_ComplexPing_SimplePing(self): 64 dce, rpctransport = self.connect() 65 objExporter = dcomrt.IObjectExporter(dce) 66 resp = objExporter.ComplexPing() 67 resp = objExporter.SimplePing(resp['pSetId']) 68 69 def test_ResolveOxid(self): 70 dce, rpctransport = self.connect() 71 scm = dcomrt.IRemoteSCMActivator(dce) 72 iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem) 73 objExporter = dcomrt.IObjectExporter(dce) 74 stringBindings = objExporter.ResolveOxid(iInterface.get_oxid(), (7,)) 75 #for binding in stringBindings: 76 # binding.dump() 77 78 def test_ResolveOxid2(self): 79 dce, rpctransport = self.connect() 80 #scm = dcomrt.IRemoteSCMActivator(dce) 81 #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem) 82 scm = dcomrt.IActivation(dce) 83 iInterface = scm.RemoteActivation(comev.CLSID_EventSystem, comev.IID_IEventSystem) 84 objExporter = dcomrt.IObjectExporter(dce) 85 stringBindings = objExporter.ResolveOxid2(iInterface.get_oxid(), (7,)) 86 #for binding in stringBindings: 87 # binding.dump() 88 89 def test_RemoteActivation(self): 90 dce, rpctransport = self.connect() 91 scm = dcomrt.IActivation(dce) 92 iInterface = scm.RemoteActivation(comev.CLSID_EventSystem, comev.IID_IEventSystem) 93 94 def test_RemoteGetClassObject(self): 95 dce, rpctransport = self.connect() 96 IID_IClassFactory = uuidtup_to_bin(('00000001-0000-0000-C000-000000000046','0.0')) 97 scm = dcomrt.IRemoteSCMActivator(dce) 98 iInterface = scm.RemoteGetClassObject(comev.CLSID_EventSystem, IID_IClassFactory) 99 iInterface.RemRelease() 100 101 102 def test_RemQueryInterface(self): 103 dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain) 104 iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem) 105 iEventSystem = comev.IEventSystem(iInterface) 106 iEventSystem.RemQueryInterface(1, (comev.IID_IEventSystem,)) 107 dcom.disconnect() 108 109 def test_RemRelease(self): 110 dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain) 111 iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem) 112 iEventSystem = comev.IEventSystem(iInterface) 113 iEventSystem.RemRelease() 114 dcom.disconnect() 115 116 def test_RemoteCreateInstance(self): 117 dce, rpctransport = self.connect() 118 119 scm = dcomrt.IRemoteSCMActivator(dce) 120 iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem) 121 122 def tes_scmp(self): 123 dce, rpctransport = self.connect() 124 125 scm = dcomrt.IRemoteSCMActivator(dce) 126 iInterface = scm.RemoteCreateInstance(scmp.CLSID_ShadowCopyProvider, scmp.IID_IVssSnapshotMgmt) 127 iVssSnapshotMgmt = scmp.IVssSnapshotMgmt(iInterface) 128 #iVssSnapshotMgmt.RemRelease() 129 130 iVssEnumMgmtObject = iVssSnapshotMgmt.QueryVolumesSupportedForSnapshots(scmp.IID_ShadowCopyProvider, 31) 131 resp = iVssEnumMgmtObject.Next(10) 132 #iVssEnumObject = iVssSnapshotMgmt.QuerySnapshotsByVolume('C:\x00') 133 134 #iProviderMgmtInterface = iVssSnapshotMgmt.GetProviderMgmtInterface() 135 #enumObject =iProviderMgmtInterface.QueryDiffAreasOnVolume('C:\x00') 136 #iVssSnapshotMgmt.RemQueryInterface(1, (scmp.IID_IVssEnumMgmtObject,)) 137 #iVssSnapshotMgmt.RemAddRef() 138 #iVssSnapshotMgmt = dcom.hRemoteCreateInstance(dce, scmp.CLSID_ShadowCopyProvider, dcom.IID_IRemUnknown) 139 140 #iVssEnumMgmtObject.RemQueryInterface(1, (scmp.IID_IVssEnumMgmtObject,)) 141 142 def tes_vds(self): 143 dce, rpctransport = self.connect() 144 145 #objExporter = dcom.IObjectExporter(dce) 146 #objExporter.ComplexPing() 147 #objExporter.ComplexPing() 148 149 scm = dcomrt.IRemoteSCMActivator(dce) 150 iInterface = scm.RemoteCreateInstance(vds.CLSID_VirtualDiskService, vds.IID_IVdsServiceInitialization) 151 serviceInitialization = vds.IVdsServiceInitialization(iInterface) 152 serviceInitialization.Initialize() 153 154 iInterface = serviceInitialization.RemQueryInterface(1, (vds.IID_IVdsService,)) 155 vdsService = vds.IVdsService(iInterface) 156 157 resp = vdsService.IsServiceReady() 158 while resp['ErrorCode'] == 1: 159 print "Waiting.. " 160 resp = vdsService.IsServiceReady() 161 162 vdsService.WaitForServiceReady() 163 vdsService.GetProperties() 164 enumObject = vdsService.QueryProviders(1) 165 interfaces = enumObject.Next(1) 166 iii = interfaces[0].RemQueryInterface(1, (vds.IID_IVdsProvider,)) 167 provider = vds.IVdsProvider(iii) 168 resp = provider.GetProperties() 169 resp.dump() 170 171 def tes_oaut(self): 172 dce, rpctransport = self.connect() 173 IID_IDispatch = string_to_bin('00020400-0000-0000-C000-000000000046') 174 IID_ITypeInfo = string_to_bin('00020401-0000-0000-C000-000000000046') 175 scm = dcomrt.IRemoteSCMActivator(dce) 176 iInterface = scm.RemoteCreateInstance(string_to_bin('4E14FBA2-2E22-11D1-9964-00C04FBBB345'), IID_IDispatch) 177 iDispatch = oaut.IDispatch(iInterface) 178 kk = iDispatch.GetTypeInfoCount() 179 kk.dump() 180 iTypeInfo = iDispatch.GetTypeInfo() 181 iTypeInfo.GetTypeAttr() 182 183 def tes_comev(self): 184 if len(self.hashes) > 0: 185 lmhash, nthash = self.hashes.split(':') 186 else: 187 lmhash = '' 188 nthash = '' 189 190 dcom = dcomrt.DCOMConnection(self.machine, self.username, self.password, self.domain, lmhash, nthash) 191 iInterface = dcom.CoCreateInstanceEx(comev.CLSID_EventSystem, comev.IID_IEventSystem) 192 193 #scm = dcomrt.IRemoteSCMActivator(dce) 194 195 #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem, comev.IID_IEventSystem) 196 #iInterface = scm.RemoteCreateInstance(comev.CLSID_EventSystem,oaut.IID_IDispatch) 197 iDispatch = oaut.IDispatch(iInterface) 198 #scm = dcomrt.IRemoteSCMActivator(dce) 199 #resp = iDispatch.GetIDsOfNames(('Navigate\x00', 'ExecWB\x00')) 200 #resp.dump() 201 iEventSystem = comev.IEventSystem(iInterface) 202 iTypeInfo = iEventSystem.GetTypeInfo() 203 resp = iTypeInfo.GetTypeAttr() 204 #resp.dump() 205 for i in range(1,resp['ppTypeAttr']['cFuncs']): 206 resp = iTypeInfo.GetFuncDesc(i) 207 #resp.dump() 208 resp2 = iTypeInfo.GetNames(resp['ppFuncDesc']['memid']) 209 #resp2.dump() 210 resp = iTypeInfo.GetDocumentation(resp['ppFuncDesc']['memid']) 211 #resp.dump() 212 #iEventSystem.get_EventObjectChangeEventClassID() 213 iEventSystem.RemRelease() 214 iTypeInfo.RemRelease() 215 216 objCollection = iEventSystem.Query('EventSystem.EventSubscriptionCollection', 'ALL') 217 218 resp = objCollection.get_Count() 219 count = resp['pCount'] 220 221 evnObj = objCollection.get_NewEnum() 222 #for i in range(count-1): 223 for i in range(3): 224 iUnknown = evnObj.Next(1)[0] 225 es = iUnknown.RemQueryInterface(1, (comev.IID_IEventSubscription3,)) 226 es = comev.IEventSubscription3(es) 227 228 #es.get_SubscriptionID() 229 print es.get_SubscriptionName()['pbstrSubscriptionName']['asData'] 230 ##es.get_PublisherID() 231 #es.get_EventClassID() 232 #es.get_MethodName() 233 ##es.get_SubscriberCLSID() 234 #es.get_SubscriberInterface() 235 #es.get_PerUser() 236 #es.get_OwnerSID() 237 #es.get_Enabled() 238 ##es.get_Description() 239 ##es.get_MachineName() 240 ##es.GetPublisherProperty() 241 #es.GetPublisherPropertyCollection() 242 ##es.GetSubscriberProperty() 243 #es.GetSubscriberPropertyCollection() 244 #es.get_InterfaceID() 245 es.RemRelease() 246 247 248 objCollection = iEventSystem.Query('EventSystem.EventClassCollection', 'ALL') 249 resp = objCollection.get_Count() 250 count = resp['pCount'] 251 252 #objCollection.get_Item('EventClassID={D5978630-5B9F-11D1-8DD2-00AA004ABD5E}') 253 evnObj = objCollection.get_NewEnum() 254 #for i in range(count-1): 255 for i in range(3): 256 257 iUnknown = evnObj.Next(1)[0] 258 259 ev = iUnknown.RemQueryInterface(1, (comev.IID_IEventClass2,)) 260 ev = comev.IEventClass2(ev) 261 262 ev.get_EventClassID() 263 #ev.get_EventClassName() 264 #ev.get_OwnerSID() 265 #ev.get_FiringInterfaceID() 266 #ev.get_Description() 267 #try: 268 # ev.get_TypeLib() 269 #except: 270 # pass 271 272 #ev.get_PublisherID() 273 #ev.get_MultiInterfacePublisherFilterCLSID() 274 #ev.get_AllowInprocActivation() 275 #ev.get_FireInParallel() 276 ev.RemRelease() 277 278 print "="*80 279 280 dcom.disconnect() 281 #eventSubscription.get_SubscriptionID() 282 283 284 def tes_ie(self): 285 dce, rpctransport = self.connect() 286 scm = dcomrt.IRemoteSCMActivator(dce) 287 288 #iInterface = scm.RemoteCreateInstance(string_to_bin('0002DF01-0000-0000-C000-000000000046'),ie.IID_WebBrowser) 289 iInterface = scm.RemoteCreateInstance(string_to_bin('72C24DD5-D70A-438B-8A42-98424B88AFB8'),dcomrt.IID_IRemUnknown) 290 291 iDispatch = ie.IWebBrowser(iInterface) 292 resp = iDispatch.GetIDsOfNames(('Navigate',)) 293 print resp 294 #sys.exit(1) 295 iTypeInfo = iDispatch.GetTypeInfo() 296 resp = iTypeInfo.GetTypeAttr() 297 #resp.dump() 298 for i in range(0,resp['ppTypeAttr']['cFuncs']): 299 resp = iTypeInfo.GetFuncDesc(i) 300 #resp.dump() 301 #resp2 = iTypeInfo.GetNames(resp['ppFuncDesc']['memid']) 302 #print resp2['rgBstrNames'][0]['asData'] 303 resp = iTypeInfo.GetDocumentation(resp['ppFuncDesc']['memid']) 304 print resp['pBstrName']['asData'] 305 #iEventSystem.get_EventObjectChangeEventClassID() 306 print "ACA" 307 iTypeInfo.RemRelease() 308 iDispatch.RemRelease() 309 310 sys.exit(1) 311 312class TCPTransport(DCOMTests): 313 def setUp(self): 314 DCOMTests.setUp(self) 315 configFile = ConfigParser.ConfigParser() 316 configFile.read('dcetests.cfg') 317 self.username = configFile.get('TCPTransport', 'username') 318 self.domain = configFile.get('TCPTransport', 'domain') 319 self.serverName = configFile.get('TCPTransport', 'servername') 320 self.password = configFile.get('TCPTransport', 'password') 321 self.machine = configFile.get('TCPTransport', 'machine') 322 self.hashes = configFile.get('TCPTransport', 'hashes') 323 self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine 324 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 325 326class TCPTransport64(DCOMTests): 327 def setUp(self): 328 DCOMTests.setUp(self) 329 configFile = ConfigParser.ConfigParser() 330 configFile.read('dcetests.cfg') 331 self.username = configFile.get('TCPTransport', 'username') 332 self.domain = configFile.get('TCPTransport', 'domain') 333 self.serverName = configFile.get('TCPTransport', 'servername') 334 self.password = configFile.get('TCPTransport', 'password') 335 self.machine = configFile.get('TCPTransport', 'machine') 336 self.hashes = configFile.get('TCPTransport', 'hashes') 337 self.stringBinding = r'ncacn_ip_tcp:%s' % self.machine 338 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 339 340 341# Process command-line arguments. 342if __name__ == '__main__': 343 import sys 344 if len(sys.argv) > 1: 345 testcase = sys.argv[1] 346 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 347 else: 348 suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport) 349 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64)) 350 unittest.TextTestRunner(verbosity=1).run(suite) 351