1###############################################################################
2#  Tested so far:
3#
4# ElfrOpenBELW
5# hElfrOpenBELW
6# ElfrOpenELW
7# hElfrOpenELW
8# ElfrRegisterEventSourceW
9# hElfrRegisterEventSourceW
10#
11#  Not yet:
12#
13# Shouldn't dump errors against a win7
14#
15################################################################################
16
17import unittest
18import ConfigParser
19
20from impacket.dcerpc.v5 import transport
21from impacket.dcerpc.v5 import epm, even
22from impacket.dcerpc.v5.dtypes import NULL, MAXIMUM_ALLOWED, OWNER_SECURITY_INFORMATION
23
24
25class RRPTests(unittest.TestCase):
26    def connect(self):
27        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
28        if len(self.hashes) > 0:
29            lmhash, nthash = self.hashes.split(':')
30        else:
31            lmhash = ''
32            nthash = ''
33        if hasattr(rpctransport, 'set_credentials'):
34            # This method exists only for selected protocol sequences.
35            rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
36        dce = rpctransport.get_dce_rpc()
37        #dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_INTEGRITY)
38        dce.connect()
39        dce.bind(even.MSRPC_UUID_EVEN, transfer_syntax = self.ts)
40
41        return dce, rpctransport
42
43    def atest_ElfrOpenBELW(self):
44        dce, rpctransport = self.connect()
45        request = even.ElfrOpenBELW()
46        request['UNCServerName'] = NULL
47        request['BackupFileName'] = '\\??\\BETO'
48        request['MajorVersion'] = 1
49        request['MinorVersion'] = 1
50        try:
51            resp = dce.request(request)
52        except Exception, e:
53            if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') < 0:
54                raise
55            resp = e.get_packet()
56        resp.dump()
57
58    def atest_hElfrOpenBELW(self):
59        dce, rpctransport = self.connect()
60        try:
61            resp = even.hElfrOpenBELW(dce, NULL, '\\??\\BETO')
62        except Exception, e:
63            if str(e).find('STATUS_OBJECT_NAME_NOT_FOUND') < 0:
64                raise
65            resp = e.get_packet()
66        resp.dump()
67
68    def atest_ElfrOpenELW(self):
69        dce, rpctransport = self.connect()
70        request = even.ElfrOpenELW()
71        request['UNCServerName'] = NULL
72        request['ModuleName'] = 'Security'
73        request['RegModuleName'] = ''
74        request['MajorVersion'] = 1
75        request['MinorVersion'] = 1
76        resp = dce.request(request)
77        resp.dump()
78
79    def atest_hElfrOpenELW(self):
80        dce, rpctransport = self.connect()
81        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
82        resp.dump()
83
84    def atest_ElfrRegisterEventSourceW(self):
85        dce, rpctransport = self.connect()
86        request = even.ElfrRegisterEventSourceW()
87        request['UNCServerName'] = NULL
88        request['ModuleName'] = 'Security'
89        request['RegModuleName'] = ''
90        request['MajorVersion'] = 1
91        request['MinorVersion'] = 1
92        try:
93            resp = dce.request(request)
94            resp.dump()
95        except Exception, e:
96            if str(e).find('STATUS_ACCESS_DENIED') < 0:
97                raise
98
99    def atest_hElfrRegisterEventSourceW(self):
100        dce, rpctransport = self.connect()
101        try:
102            resp = even.hElfrRegisterEventSourceW(dce, NULL, 'Security', '')
103            resp.dump()
104        except Exception, e:
105            if str(e).find('STATUS_ACCESS_DENIED') < 0:
106                raise
107
108    def atest_ElfrReadELW(self):
109        dce, rpctransport = self.connect()
110        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
111        resp.dump()
112        request = even.ElfrReadELW()
113        request['LogHandle'] = resp['LogHandle']
114        request['ReadFlags'] = even.EVENTLOG_SEQUENTIAL_READ | even.EVENTLOG_FORWARDS_READ
115        request['RecordOffset'] = 0
116        request['NumberOfBytesToRead'] = even.MAX_BATCH_BUFF/2
117        resp = dce.request(request)
118        resp.dump()
119
120    def atest_hElfrReadELW(self):
121        dce, rpctransport = self.connect()
122        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
123        resp.dump()
124        resp = even.hElfrReadELW(dce, resp['LogHandle'],even.EVENTLOG_SEQUENTIAL_READ | even.EVENTLOG_FORWARDS_READ,0, even.MAX_BATCH_BUFF/2 )
125        resp.dump()
126
127    def atest_ElfrClearELFW(self):
128        dce, rpctransport = self.connect()
129        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
130        resp.dump()
131        request = even.ElfrClearELFW()
132        request['LogHandle'] = resp['LogHandle']
133        request['BackupFileName'] = '\\??\\c:\\beto2'
134        try:
135            resp = dce.request(request)
136            resp.dump()
137        except Exception, e:
138            if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0:
139                raise
140
141    def atest_hElfrClearELFW(self):
142        dce, rpctransport = self.connect()
143        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
144        resp.dump()
145        try:
146            resp = even.hElfrClearELFW(dce, resp['LogHandle'], '\\??\\c:\\beto2')
147            resp.dump()
148        except Exception, e:
149            if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0:
150                raise
151
152    def atest_ElfrBackupELFW(self):
153        dce, rpctransport = self.connect()
154        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
155        resp.dump()
156        request = even.ElfrBackupELFW()
157        request['LogHandle'] = resp['LogHandle']
158        request['BackupFileName'] = '\\??\\c:\\beto2'
159        try:
160            resp = dce.request(request)
161            resp.dump()
162        except Exception, e:
163            if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0:
164                raise
165
166    def atest_hElfrBackupELFW(self):
167        dce, rpctransport = self.connect()
168        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
169        resp.dump()
170        try:
171            resp = even.hElfrBackupELFW(dce, resp['LogHandle'], '\\??\\c:\\beto2')
172            resp.dump()
173        except Exception, e:
174            if str(e).find('STATUS_OBJECT_NAME_INVALID') < 0:
175                raise
176
177    def test_ElfrReportEventW(self):
178        dce, rpctransport = self.connect()
179        resp = even.hElfrOpenELW(dce, NULL, 'Security', '')
180        resp.dump()
181        request = even.ElfrReportEventW()
182        request['LogHandle'] = resp['LogHandle']
183        request['Time'] = 5000000
184        request['EventType'] = even.EVENTLOG_ERROR_TYPE
185        request['EventCategory'] = 0
186        request['EventID'] = 7037
187        request['ComputerName'] = 'MYCOMPUTER!'
188        request['NumStrings'] = 1
189        request['DataSize'] = 0
190        request['UserSID'].fromCanonical('S-1-2-5-21')
191        nn = even.PRPC_UNICODE_STRING()
192        nn['Data'] = 'HOLA BETUSSS'
193        request['Strings'].append(nn)
194        request['Data'] = NULL
195        request['Flags'] = 0
196        request['RecordNumber'] = NULL
197        request['TimeWritten'] = NULL
198        try:
199            resp = dce.request(request)
200            resp.dump()
201        except Exception, e:
202            if str(e).find('STATUS_ACCESS_DENIED') < 0:
203                raise
204
205class SMBTransport(RRPTests):
206    def setUp(self):
207        RRPTests.setUp(self)
208        configFile = ConfigParser.ConfigParser()
209        configFile.read('dcetests.cfg')
210        self.username = configFile.get('SMBTransport', 'username')
211        self.domain   = configFile.get('SMBTransport', 'domain')
212        self.serverName = configFile.get('SMBTransport', 'servername')
213        self.password = configFile.get('SMBTransport', 'password')
214        self.machine  = configFile.get('SMBTransport', 'machine')
215        self.hashes   = configFile.get('SMBTransport', 'hashes')
216        self.stringBinding = r'ncacn_np:%s[\PIPE\eventlog]' % self.machine
217        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
218
219class SMBTransport64(RRPTests):
220    def setUp(self):
221        RRPTests.setUp(self)
222        configFile = ConfigParser.ConfigParser()
223        configFile.read('dcetests.cfg')
224        self.username = configFile.get('SMBTransport', 'username')
225        self.domain   = configFile.get('SMBTransport', 'domain')
226        self.serverName = configFile.get('SMBTransport', 'servername')
227        self.password = configFile.get('SMBTransport', 'password')
228        self.machine  = configFile.get('SMBTransport', 'machine')
229        self.hashes   = configFile.get('SMBTransport', 'hashes')
230        self.stringBinding = r'ncacn_np:%s[\PIPE\eventlog]' % self.machine
231        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
232
233# Process command-line arguments.
234if __name__ == '__main__':
235    import sys
236    if len(sys.argv) > 1:
237        testcase = sys.argv[1]
238        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
239    else:
240        suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport)
241        #suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64))
242    unittest.TextTestRunner(verbosity=1).run(suite)
243