1############################################################################### 2# Tested so far: 3# 4# ElfrOpenBELW 5# hElfrOpenBELW 6# ElfrOpenELW 7# hElfrOpenELW 8# ElfrRegisterEventSourceW 9# hElfrRegisterEventSourceW 10# 11# Not yet: 12# 13# Shouldn't dump errors against a win7 14# 15################################################################################ 16 17import unittest 18import ConfigParser 19 20from impacket.dcerpc.v5 import transport 21from impacket.dcerpc.v5 import epm, even 22from impacket.dcerpc.v5.dtypes import NULL, MAXIMUM_ALLOWED, OWNER_SECURITY_INFORMATION 23 24 25class RRPTests(unittest.TestCase): 26 def connect(self): 27 rpctransport = transport.DCERPCTransportFactory(self.stringBinding) 28 if len(self.hashes) > 0: 29 lmhash, nthash = self.hashes.split(':') 30 else: 31 lmhash = '' 32 nthash = '' 33 if hasattr(rpctransport, 'set_credentials'): 34 # This method exists only for selected protocol sequences. 35 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 36 dce = rpctransport.get_dce_rpc() 37 #dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY) 38 dce.connect() 39 dce.bind(even.MSRPC_UUID_EVEN, transfer_syntax = self.ts) 40 41 return dce, rpctransport 42 43 def atest_ElfrOpenBELW(self): 44 dce, rpctransport = self.connect() 45 request = even.ElfrOpenBELW() 46 request['UNCServerName'] = NULL 47 request['BackupFileName'] = '\\??\\BETO' 48 request['MajorVersion'] = 1 49 request['MinorVersion'] = 1 50 try: 51 resp = dce.request(request) 52 except Exception, e: 53 if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') < 0: 54 raise 55 resp = e.get_packet() 56 resp.dump() 57 58 def atest_hElfrOpenBELW(self): 59 dce, rpctransport = self.connect() 60 try: 61 resp = even.hElfrOpenBELW(dce, NULL, '\\??\\BETO') 62 except Exception, e: 63 if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') < 0: 64 raise 65 resp = e.get_packet() 66 resp.dump() 67 68 def atest_ElfrOpenELW(self): 69 dce, rpctransport = self.connect() 70 request = even.ElfrOpenELW() 71 request['UNCServerName'] = NULL 72 request['ModuleName'] = 'Security' 73 request['RegModuleName'] = '' 74 request['MajorVersion'] = 1 75 request['MinorVersion'] = 1 76 resp = dce.request(request) 77 resp.dump() 78 79 def atest_hElfrOpenELW(self): 80 dce, rpctransport = self.connect() 81 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 82 resp.dump() 83 84 def atest_ElfrRegisterEventSourceW(self): 85 dce, rpctransport = self.connect() 86 request = even.ElfrRegisterEventSourceW() 87 request['UNCServerName'] = NULL 88 request['ModuleName'] = 'Security' 89 request['RegModuleName'] = '' 90 request['MajorVersion'] = 1 91 request['MinorVersion'] = 1 92 try: 93 resp = dce.request(request) 94 resp.dump() 95 except Exception, e: 96 if str(e).find('STATUS_ACCESS_DENIED') < 0: 97 raise 98 99 def atest_hElfrRegisterEventSourceW(self): 100 dce, rpctransport = self.connect() 101 try: 102 resp = even.hElfrRegisterEventSourceW(dce, NULL, 'Security', '') 103 resp.dump() 104 except Exception, e: 105 if str(e).find('STATUS_ACCESS_DENIED') < 0: 106 raise 107 108 def atest_ElfrReadELW(self): 109 dce, rpctransport = self.connect() 110 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 111 resp.dump() 112 request = even.ElfrReadELW() 113 request['LogHandle'] = resp['LogHandle'] 114 request['ReadFlags'] = even.EVENTLOG_SEQUENTIAL_READ | even.EVENTLOG_FORWARDS_READ 115 request['RecordOffset'] = 0 116 request['NumberOfBytesToRead'] = even.MAX_BATCH_BUFF/2 117 resp = dce.request(request) 118 resp.dump() 119 120 def atest_hElfrReadELW(self): 121 dce, rpctransport = self.connect() 122 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 123 resp.dump() 124 resp = even.hElfrReadELW(dce, resp['LogHandle'],even.EVENTLOG_SEQUENTIAL_READ | even.EVENTLOG_FORWARDS_READ,0, even.MAX_BATCH_BUFF/2 ) 125 resp.dump() 126 127 def atest_ElfrClearELFW(self): 128 dce, rpctransport = self.connect() 129 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 130 resp.dump() 131 request = even.ElfrClearELFW() 132 request['LogHandle'] = resp['LogHandle'] 133 request['BackupFileName'] = '\\??\\c:\\beto2' 134 try: 135 resp = dce.request(request) 136 resp.dump() 137 except Exception, e: 138 if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0: 139 raise 140 141 def atest_hElfrClearELFW(self): 142 dce, rpctransport = self.connect() 143 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 144 resp.dump() 145 try: 146 resp = even.hElfrClearELFW(dce, resp['LogHandle'], '\\??\\c:\\beto2') 147 resp.dump() 148 except Exception, e: 149 if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0: 150 raise 151 152 def atest_ElfrBackupELFW(self): 153 dce, rpctransport = self.connect() 154 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 155 resp.dump() 156 request = even.ElfrBackupELFW() 157 request['LogHandle'] = resp['LogHandle'] 158 request['BackupFileName'] = '\\??\\c:\\beto2' 159 try: 160 resp = dce.request(request) 161 resp.dump() 162 except Exception, e: 163 if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0: 164 raise 165 166 def atest_hElfrBackupELFW(self): 167 dce, rpctransport = self.connect() 168 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 169 resp.dump() 170 try: 171 resp = even.hElfrBackupELFW(dce, resp['LogHandle'], '\\??\\c:\\beto2') 172 resp.dump() 173 except Exception, e: 174 if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0: 175 raise 176 177 def test_ElfrReportEventW(self): 178 dce, rpctransport = self.connect() 179 resp = even.hElfrOpenELW(dce, NULL, 'Security', '') 180 resp.dump() 181 request = even.ElfrReportEventW() 182 request['LogHandle'] = resp['LogHandle'] 183 request['Time'] = 5000000 184 request['EventType'] = even.EVENTLOG_ERROR_TYPE 185 request['EventCategory'] = 0 186 request['EventID'] = 7037 187 request['ComputerName'] = 'MYCOMPUTER!' 188 request['NumStrings'] = 1 189 request['DataSize'] = 0 190 request['UserSID'].fromCanonical('S-1-2-5-21') 191 nn = even.PRPC_UNICODE_STRING() 192 nn['Data'] = 'HOLA BETUSSS' 193 request['Strings'].append(nn) 194 request['Data'] = NULL 195 request['Flags'] = 0 196 request['RecordNumber'] = NULL 197 request['TimeWritten'] = NULL 198 try: 199 resp = dce.request(request) 200 resp.dump() 201 except Exception, e: 202 if str(e).find('STATUS_ACCESS_DENIED') < 0: 203 raise 204 205class SMBTransport(RRPTests): 206 def setUp(self): 207 RRPTests.setUp(self) 208 configFile = ConfigParser.ConfigParser() 209 configFile.read('dcetests.cfg') 210 self.username = configFile.get('SMBTransport', 'username') 211 self.domain = configFile.get('SMBTransport', 'domain') 212 self.serverName = configFile.get('SMBTransport', 'servername') 213 self.password = configFile.get('SMBTransport', 'password') 214 self.machine = configFile.get('SMBTransport', 'machine') 215 self.hashes = configFile.get('SMBTransport', 'hashes') 216 self.stringBinding = r'ncacn_np:%s[\PIPE\eventlog]' % self.machine 217 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 218 219class SMBTransport64(RRPTests): 220 def setUp(self): 221 RRPTests.setUp(self) 222 configFile = ConfigParser.ConfigParser() 223 configFile.read('dcetests.cfg') 224 self.username = configFile.get('SMBTransport', 'username') 225 self.domain = configFile.get('SMBTransport', 'domain') 226 self.serverName = configFile.get('SMBTransport', 'servername') 227 self.password = configFile.get('SMBTransport', 'password') 228 self.machine = configFile.get('SMBTransport', 'machine') 229 self.hashes = configFile.get('SMBTransport', 'hashes') 230 self.stringBinding = r'ncacn_np:%s[\PIPE\eventlog]' % self.machine 231 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 232 233# Process command-line arguments. 234if __name__ == '__main__': 235 import sys 236 if len(sys.argv) > 1: 237 testcase = sys.argv[1] 238 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 239 else: 240 suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport) 241 #suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64)) 242 unittest.TextTestRunner(verbosity=1).run(suite) 243