1###############################################################################
2#  Tested so far:
3#
4# NetrWkstaGetInfo
5# NetrWkstaUserEnum
6# NetrWkstaTransportEnum
7# NetrWkstaTransportAdd
8# NetrUseAdd
9# NetrUseGetInfo
10# NetrUseDel
11# NetrUseEnum
12# NetrWorkstationStatisticsGet
13# NetrGetJoinInformation
14# NetrJoinDomain2
15# NetrUnjoinDomain2
16# NetrRenameMachineInDomain2
17# NetrValidateName2
18# NetrGetJoinableOUs2
19# NetrAddAlternateComputerName
20# NetrRemoveAlternateComputerName
21# NetrSetPrimaryComputerName
22# NetrEnumerateComputerNames
23#
24#  Not yet:
25#
26# Shouldn't dump errors against a win7
27#
28################################################################################
29
30import unittest
31import ConfigParser
32
33from impacket.dcerpc.v5 import transport
34from impacket.dcerpc.v5 import wkst
35from impacket.dcerpc.v5.ndr import NULL
36
37
38class WKSTTests(unittest.TestCase):
39    def connect(self):
40        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
41        if len(self.hashes) > 0:
42            lmhash, nthash = self.hashes.split(':')
43        else:
44            lmhash = ''
45            nthash = ''
46        if hasattr(rpctransport, 'set_credentials'):
47            # This method exists only for selected protocol sequences.
48            rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
49        dce = rpctransport.get_dce_rpc()
50        dce.connect()
51        dce.bind(wkst.MSRPC_UUID_WKST, transfer_syntax = self.ts)
52
53        return dce, rpctransport
54
55    def test_NetrWkstaGetInfo(self):
56        dce, rpctransport = self.connect()
57        request = wkst.NetrWkstaGetInfo()
58        request['ServerName'] = '\x00'*10
59        request['Level'] = 100
60        resp = dce.request(request)
61        resp.dump()
62
63        request['Level'] = 101
64        resp = dce.request(request)
65        resp.dump()
66
67        request['Level'] = 102
68        resp = dce.request(request)
69        resp.dump()
70
71        request['Level'] = 502
72        resp = dce.request(request)
73        resp.dump()
74
75    def test_hNetrWkstaGetInfo(self):
76        dce, rpctransport = self.connect()
77        resp = wkst.hNetrWkstaGetInfo(dce, 100)
78        resp.dump()
79
80        resp = wkst.hNetrWkstaGetInfo(dce, 101)
81        resp.dump()
82
83        resp = wkst.hNetrWkstaGetInfo(dce, 102)
84        resp.dump()
85
86        resp = wkst.hNetrWkstaGetInfo(dce, 502)
87        resp.dump()
88
89    def test_NetrWkstaUserEnum(self):
90        dce, rpctransport = self.connect()
91        request = wkst.NetrWkstaUserEnum()
92        request['ServerName'] = '\x00'*10
93        request['UserInfo']['Level'] = 0
94        request['UserInfo']['WkstaUserInfo']['tag'] = 0
95        request['PreferredMaximumLength'] = 8192
96        resp = dce.request(request)
97        resp.dump()
98
99        request['UserInfo']['Level'] = 1
100        request['UserInfo']['WkstaUserInfo']['tag'] = 1
101        resp = dce.request(request)
102        resp.dump()
103
104    def test_hNetrWkstaUserEnum(self):
105        dce, rpctransport = self.connect()
106        resp = wkst.hNetrWkstaUserEnum(dce, 0)
107        resp.dump()
108
109        resp = wkst.hNetrWkstaUserEnum(dce, 1)
110        resp.dump()
111
112    def test_NetrWkstaTransportEnum(self):
113        dce, rpctransport = self.connect()
114        request = wkst.NetrWkstaTransportEnum()
115        request['ServerName'] = '\x00'*10
116        request['TransportInfo']['Level'] = 0
117        request['TransportInfo']['WkstaTransportInfo']['tag'] = 0
118        request['PreferredMaximumLength'] = 500
119        request['ResumeHandle'] = NULL
120        resp = dce.request(request)
121        resp.dump()
122
123    def test_hNetrWkstaTransportEnum(self):
124        dce, rpctransport = self.connect()
125        resp = wkst.hNetrWkstaTransportEnum(dce, 0)
126        resp.dump()
127
128    def test_NetrWkstaSetInfo(self):
129        dce, rpctransport = self.connect()
130        request = wkst.NetrWkstaGetInfo()
131        request['ServerName'] = '\x00'*10
132        request['Level'] = 502
133        resp = dce.request(request)
134        resp.dump()
135        oldVal = resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit']
136
137        req = wkst.NetrWkstaSetInfo()
138        req['ServerName'] = '\x00'*10
139        req['Level'] = 502
140        req['WkstaInfo'] = resp['WkstaInfo']
141        req['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = 500
142        resp2 = dce.request(req)
143        resp2.dump()
144
145        resp = dce.request(request)
146        self.assertTrue(500 == resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] )
147
148        req['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = oldVal
149        resp2 = dce.request(req)
150        resp2.dump()
151
152    def test_hNetrWkstaSetInfo(self):
153        dce, rpctransport = self.connect()
154        resp = wkst.hNetrWkstaGetInfo(dce, 502)
155        resp.dump()
156        oldVal = resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit']
157
158
159        resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = 500
160        resp2 = wkst.hNetrWkstaSetInfo(dce, 502,resp['WkstaInfo']['WkstaInfo502'])
161        resp2.dump()
162
163        resp = wkst.hNetrWkstaGetInfo(dce, 502)
164        resp.dump()
165        self.assertTrue(500 == resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] )
166
167        resp['WkstaInfo']['WkstaInfo502']['wki502_dormant_file_limit'] = oldVal
168        resp2 = wkst.hNetrWkstaSetInfo(dce, 502,resp['WkstaInfo']['WkstaInfo502'])
169        resp2.dump()
170
171    def test_NetrWkstaTransportAdd(self):
172        dce, rpctransport = self.connect()
173
174        req = wkst.NetrWkstaTransportAdd()
175        req['ServerName'] = '\x00'*10
176        req['Level'] = 0
177        req['TransportInfo']['wkti0_transport_name'] = 'BETO\x00'
178        req['TransportInfo']['wkti0_transport_address'] = '000C29BC5CE5\x00'
179        try:
180            resp2 = dce.request(req)
181            resp2.dump()
182        except Exception, e:
183            if str(e).find('ERROR_INVALID_FUNCTION') < 0:
184                raise
185
186    def test_hNetrUseAdd_hNetrUseDel_hNetrUseGetInfo_hNetrUseEnum(self):
187        dce, rpctransport = self.connect()
188
189        info1 = wkst.LPUSE_INFO_1()
190
191        info1['ui1_local'] = 'Z:\x00'
192        info1['ui1_remote'] = '\\\\127.0.0.1\\c$\x00'
193        info1['ui1_password'] = NULL
194        try:
195            resp = wkst.hNetrUseAdd(dce, 1, info1)
196            resp.dump()
197        except Exception, e:
198            if str(e).find('rpc_s_access_denied') >=0:
199                # This could happen in newer OSes
200                pass
201
202        # We're not testing this call with NDR64, it fails and I can't see the contents
203        if self.ts == ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0'):
204            return
205
206        try:
207            resp = wkst.hNetrUseEnum(dce, 2)
208            resp.dump()
209        except Exception, e:
210            if str(e).find('STATUS_PIPE_DISCONNECTED') >=0:
211                # This could happen in newer OSes
212                pass
213
214        try:
215            resp2 = wkst.hNetrUseGetInfo(dce, 'Z:', 3)
216            resp2.dump()
217        except Exception, e:
218            if str(e).find('STATUS_PIPE_DISCONNECTED') >=0:
219                # This could happen in newer OSes
220                pass
221
222        try:
223            resp = wkst.hNetrUseDel(dce,'Z:')
224            resp.dump()
225        except Exception, e:
226            if str(e).find('STATUS_PIPE_DISCONNECTED') >=0:
227                # This could happen in newer OSes
228                pass
229
230    def test_NetrUseAdd_NetrUseDel_NetrUseGetInfo_NetrUseEnum(self):
231        dce, rpctransport = self.connect()
232
233        req = wkst.NetrUseAdd()
234        req['ServerName'] = '\x00'*10
235        req['Level'] = 1
236        req['InfoStruct']['tag'] = 1
237        req['InfoStruct']['UseInfo1']['ui1_local'] = 'Z:\x00'
238        req['InfoStruct']['UseInfo1']['ui1_remote'] = '\\\\127.0.0.1\\c$\x00'
239        req['InfoStruct']['UseInfo1']['ui1_password'] = NULL
240        try:
241            resp2 = dce.request(req)
242            resp2.dump()
243        except Exception, e:
244            if str(e).find('rpc_s_access_denied') >=0:
245                # This could happen in newer OSes
246                pass
247
248        # We're not testing this call with NDR64, it fails and I can't see the contents
249        if self.ts == ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0'):
250            return
251
252        req = wkst.NetrUseEnum()
253        req['ServerName'] = NULL
254        req['InfoStruct']['Level'] = 2
255        req['InfoStruct']['UseInfo']['tag'] = 2
256        req['InfoStruct']['UseInfo']['Level2']['Buffer'] = NULL
257        req['PreferredMaximumLength'] = 0xffffffff
258        req['ResumeHandle'] = NULL
259        try:
260            resp2 = dce.request(req)
261            resp2.dump()
262        except Exception, e:
263            if str(e).find('rpc_s_access_denied') >=0:
264                # This could happen in newer OSes
265                pass
266
267        req = wkst.NetrUseGetInfo()
268        req['ServerName'] = '\x00'*10
269        req['UseName'] = 'Z:\x00'
270        req['Level'] = 3
271        try:
272            resp2 = dce.request(req)
273            resp2.dump()
274        except Exception, e:
275            if str(e).find('rpc_s_access_denied') >=0:
276                # This could happen in newer OSes
277                pass
278
279
280        req = wkst.NetrUseDel()
281        req['ServerName'] = '\x00'*10
282        req['UseName'] = 'Z:\x00'
283        req['ForceLevel'] = wkst.USE_LOTS_OF_FORCE
284        try:
285            resp2 = dce.request(req)
286            resp2.dump()
287        except Exception, e:
288            if str(e).find('rpc_s_access_denied') >=0:
289                # This could happen in newer OSes
290                pass
291
292
293    def test_NetrWorkstationStatisticsGet(self):
294        dce, rpctransport = self.connect()
295
296        req = wkst.NetrWorkstationStatisticsGet()
297        req['ServerName'] = '\x00'*10
298        req['ServiceName'] = '\x00'
299        req['Level'] = 0
300        req['Options'] = 0
301        try:
302            resp2 = dce.request(req)
303            resp2.dump()
304        except Exception, e:
305            if str(e).find('ERROR_INVALID_PARAMETER') < 0:
306                raise
307
308    def test_hNetrWorkstationStatisticsGet(self):
309        dce, rpctransport = self.connect()
310
311        try:
312            resp2 = wkst.hNetrWorkstationStatisticsGet(dce, '\x00', 0, 0)
313            resp2.dump()
314        except Exception, e:
315            if str(e).find('ERROR_INVALID_PARAMETER') < 0:
316                raise
317
318    def test_NetrGetJoinInformation(self):
319        dce, rpctransport = self.connect()
320
321        req = wkst.NetrGetJoinInformation()
322        req['ServerName'] = '\x00'*10
323        req['NameBuffer'] = '\x00'
324
325        try:
326            resp2 = dce.request(req)
327            resp2.dump()
328        except Exception, e:
329            if str(e).find('ERROR_INVALID_PARAMETER') < 0:
330                raise
331
332    def test_hNetrGetJoinInformation(self):
333        dce, rpctransport = self.connect()
334
335        try:
336            resp = wkst.hNetrGetJoinInformation(dce, '\x00')
337            resp.dump()
338        except Exception, e:
339            if str(e).find('ERROR_INVALID_PARAMETER') < 0:
340                raise
341
342    def test_NetrJoinDomain2(self):
343        dce, rpctransport = self.connect()
344
345        req = wkst.NetrJoinDomain2()
346        req['ServerName'] = '\x00'*10
347        req['DomainNameParam'] = '172.16.123.1\\FREEFLY\x00'
348        req['MachineAccountOU'] = 'OU=BETUS,DC=FREEFLY\x00'
349        req['AccountName'] = NULL
350        req['Password']['Buffer'] = '\x00'*512
351        req['Options'] = wkst.NETSETUP_DOMAIN_JOIN_IF_JOINED
352        #req.dump()
353        try:
354            resp2 = dce.request(req)
355            resp2.dump()
356        except Exception, e:
357            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
358                raise
359
360    def test_hNetrJoinDomain2(self):
361        dce, rpctransport = self.connect()
362
363        try:
364            resp = wkst.hNetrJoinDomain2(dce,'172.16.123.1\\FREEFLY\x00','OU=BETUS,DC=FREEFLY\x00',NULL,'\x00'*512, wkst.NETSETUP_DOMAIN_JOIN_IF_JOINED)
365            resp.dump()
366        except Exception, e:
367            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
368                raise
369
370    def test_NetrUnjoinDomain2(self):
371        dce, rpctransport = self.connect()
372
373        req = wkst.NetrUnjoinDomain2()
374        req['ServerName'] = '\x00'*10
375        req['AccountName'] = NULL
376        req['Password']['Buffer'] = '\x00'*512
377        #req['Password'] = NULL
378        req['Options'] = wkst.NETSETUP_ACCT_DELETE
379        try:
380            resp2 = dce.request(req)
381            resp2.dump()
382        except Exception, e:
383            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
384                raise
385
386    def test_hNetrUnjoinDomain2(self):
387        dce, rpctransport = self.connect()
388
389        try:
390            resp = wkst.hNetrUnjoinDomain2(dce, NULL, '\x00'*512, wkst.NETSETUP_ACCT_DELETE)
391            resp.dump()
392        except Exception, e:
393            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
394                raise
395
396    def test_NetrRenameMachineInDomain2(self):
397        dce, rpctransport = self.connect()
398
399        req = wkst.NetrRenameMachineInDomain2()
400        req['ServerName'] = '\x00'*10
401        req['MachineName'] = 'BETUS\x00'
402        req['AccountName'] = NULL
403        req['Password']['Buffer'] = '\x00'*512
404        #req['Password'] = NULL
405        req['Options'] = wkst.NETSETUP_ACCT_CREATE
406        try:
407            resp2 = dce.request(req)
408            resp2.dump()
409        except Exception, e:
410            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
411                raise
412
413    def test_hNetrRenameMachineInDomain2(self):
414        dce, rpctransport = self.connect()
415
416        try:
417            resp = wkst.hNetrRenameMachineInDomain2(dce, 'BETUS\x00', NULL, '\x00'*512, wkst.NETSETUP_ACCT_CREATE)
418            resp.dump()
419        except Exception, e:
420            if str(e).find('ERROR_INVALID_PASSWORD') < 0:
421                raise
422
423    def test_NetrValidateName2(self):
424        dce, rpctransport = self.connect()
425
426        req = wkst.NetrValidateName2()
427        req['ServerName'] = '\x00'*10
428        req['NameToValidate'] = 'BETO\x00'
429        req['AccountName'] = NULL
430        req['Password'] = NULL
431        req['NameType'] = wkst.NETSETUP_NAME_TYPE.NetSetupDomain
432        try:
433            resp2 = dce.request(req)
434            resp2.dump()
435        except Exception, e:
436            if str(e).find('0x8001011c') < 0:
437                raise
438
439    def test_hNetrValidateName2(self):
440        dce, rpctransport = self.connect()
441
442        try:
443            resp2 = wkst.hNetrValidateName2(dce, 'BETO\x00', NULL, NULL, wkst.NETSETUP_NAME_TYPE.NetSetupDomain)
444            resp2.dump()
445        except Exception, e:
446            if str(e).find('0x8001011c') < 0:
447                raise
448
449    def test_NetrGetJoinableOUs2(self):
450        dce, rpctransport = self.connect()
451
452        req = wkst.NetrGetJoinableOUs2()
453        req['ServerName'] = '\x00'*10
454        req['DomainNameParam'] = 'FREEFLY\x00'
455        req['AccountName'] = NULL
456        req['Password'] = NULL
457        req['OUCount'] = 0
458        #req.dump()
459        try:
460            resp2 = dce.request(req)
461            resp2.dump()
462        except Exception, e:
463            if str(e).find('0x8001011c') < 0:
464                raise
465
466    def test_hNetrGetJoinableOUs2(self):
467        dce, rpctransport = self.connect()
468
469        try:
470            resp = wkst.hNetrGetJoinableOUs2(dce,'FREEFLY\x00', NULL, NULL,0 )
471            resp.dump()
472        except Exception, e:
473            if str(e).find('0x8001011c') < 0:
474                raise
475
476    def test_NetrAddAlternateComputerName(self):
477        dce, rpctransport = self.connect()
478
479        req = wkst.NetrAddAlternateComputerName()
480        req['ServerName'] = '\x00'*10
481        req['AlternateName'] = 'FREEFLY\x00'
482        req['DomainAccount'] = NULL
483        req['EncryptedPassword'] = NULL
484        #req.dump()
485        try:
486            resp2 = dce.request(req)
487            resp2.dump()
488        except Exception, e:
489            if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0:
490                raise
491
492    def test_hNetrAddAlternateComputerName(self):
493        dce, rpctransport = self.connect()
494
495        try:
496            resp2= wkst.hNetrAddAlternateComputerName(dce, 'FREEFLY\x00', NULL, NULL)
497            resp2.dump()
498        except Exception, e:
499            if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0:
500                raise
501
502    def test_NetrRemoveAlternateComputerName(self):
503        dce, rpctransport = self.connect()
504
505        req = wkst.NetrRemoveAlternateComputerName()
506        req['ServerName'] = '\x00'*10
507        req['AlternateName'] = 'FREEFLY\x00'
508        req['DomainAccount'] = NULL
509        req['EncryptedPassword'] = NULL
510        #req.dump()
511        try:
512            resp2 = dce.request(req)
513            resp2.dump()
514        except Exception, e:
515            if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0:
516                raise
517
518    def test_hNetrRemoveAlternateComputerName(self):
519        dce, rpctransport = self.connect()
520
521        try:
522            resp2 = wkst.hNetrRemoveAlternateComputerName(dce,'FREEFLY\x00', NULL, NULL )
523            resp2.dump()
524        except Exception, e:
525            if str(e).find('ERROR_NOT_SUPPORTED') < 0 and str(e).find('ERROR_INVALID_PASSWORD') < 0:
526                raise
527
528    def test_NetrSetPrimaryComputerName(self):
529        dce, rpctransport = self.connect()
530
531        req = wkst.NetrSetPrimaryComputerName()
532        req['ServerName'] = '\x00'*10
533        req['PrimaryName'] = 'FREEFLY\x00'
534        req['DomainAccount'] = NULL
535        req['EncryptedPassword'] = NULL
536        #req.dump()
537        try:
538            resp2 = dce.request(req)
539            resp2.dump()
540        except Exception, e:
541            if str(e).find('ERROR_NOT_SUPPORTED') < 0:
542                if str(e).find('ERROR_INVALID_PARAMETER') < 0:
543                    raise
544
545    def test_hNetrSetPrimaryComputerName(self):
546        dce, rpctransport = self.connect()
547
548        try:
549            resp2 = wkst.hNetrSetPrimaryComputerName(dce,'FREEFLY\x00', NULL, NULL )
550            resp2.dump()
551        except Exception, e:
552            if str(e).find('ERROR_NOT_SUPPORTED') < 0:
553                if str(e).find('ERROR_INVALID_PARAMETER') < 0:
554                    raise
555
556    def test_NetrEnumerateComputerNames(self):
557        dce, rpctransport = self.connect()
558
559        req = wkst.NetrEnumerateComputerNames()
560        req['ServerName'] = '\x00'*10
561        req['NameType'] = wkst.NET_COMPUTER_NAME_TYPE.NetAllComputerNames
562        #req.dump()
563        try:
564            resp2 = dce.request(req)
565            resp2.dump()
566        except Exception, e:
567            if str(e).find('ERROR_NOT_SUPPORTED') < 0:
568                raise
569
570    def test_hNetrEnumerateComputerNames(self):
571        dce, rpctransport = self.connect()
572
573        try:
574            resp2 = wkst.hNetrEnumerateComputerNames(dce,wkst.NET_COMPUTER_NAME_TYPE.NetAllComputerNames)
575            resp2.dump()
576        except Exception, e:
577            if str(e).find('ERROR_NOT_SUPPORTED') < 0:
578                raise
579
580
581class SMBTransport(WKSTTests):
582    def setUp(self):
583        WKSTTests.setUp(self)
584        configFile = ConfigParser.ConfigParser()
585        configFile.read('dcetests.cfg')
586        self.username = configFile.get('SMBTransport', 'username')
587        self.domain   = configFile.get('SMBTransport', 'domain')
588        self.serverName = configFile.get('SMBTransport', 'servername')
589        self.password = configFile.get('SMBTransport', 'password')
590        self.machine  = configFile.get('SMBTransport', 'machine')
591        self.hashes   = configFile.get('SMBTransport', 'hashes')
592        self.stringBinding = r'ncacn_np:%s[\PIPE\wkssvc]' % self.machine
593        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
594
595class SMBTransport64(WKSTTests):
596    def setUp(self):
597        WKSTTests.setUp(self)
598        configFile = ConfigParser.ConfigParser()
599        configFile.read('dcetests.cfg')
600        self.username = configFile.get('SMBTransport', 'username')
601        self.domain   = configFile.get('SMBTransport', 'domain')
602        self.serverName = configFile.get('SMBTransport', 'servername')
603        self.password = configFile.get('SMBTransport', 'password')
604        self.machine  = configFile.get('SMBTransport', 'machine')
605        self.hashes   = configFile.get('SMBTransport', 'hashes')
606        self.stringBinding = r'ncacn_np:%s[\PIPE\wkssvc]' % self.machine
607        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
608
609# Process command-line arguments.
610if __name__ == '__main__':
611    import sys
612    if len(sys.argv) > 1:
613        testcase = sys.argv[1]
614        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
615    else:
616        suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport)
617        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64))
618    unittest.TextTestRunner(verbosity=1).run(suite)
619