1###############################################################################
2#  Tested so far:
3#
4#  Not yet:
5#
6# Shouldn't dump errors against a win7
7#
8################################################################################
9
10import unittest
11import ConfigParser
12
13from impacket.dcerpc.v5 import transport
14from impacket.dcerpc.v5 import mgmt
15
16
17class MGMTTests(unittest.TestCase):
18    def connect(self):
19        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
20        if len(self.hashes) > 0:
21            lmhash, nthash = self.hashes.split(':')
22        else:
23            lmhash = ''
24            nthash = ''
25        if hasattr(rpctransport, 'set_credentials'):
26            # This method exists only for selected protocol sequences.
27            rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
28        dce = rpctransport.get_dce_rpc()
29        dce.connect()
30        dce.bind(mgmt.MSRPC_UUID_MGMT, transfer_syntax = self.ts)
31
32        return dce, rpctransport
33
34    def test_inq_if_ids(self):
35        dce, transport = self.connect()
36
37        request = mgmt.inq_if_ids()
38        resp = dce.request(request)
39        resp.dump()
40        #for i in range(resp['if_id_vector']['count']):
41        #    print bin_to_uuidtup(resp['if_id_vector']['if_id'][i]['Data'].getData())
42        #    print
43
44    def test_hinq_if_ids(self):
45        dce, transport = self.connect()
46
47        resp = mgmt.hinq_if_ids(dce)
48        resp.dump()
49
50    def test_inq_stats(self):
51        dce, transport = self.connect()
52
53        request = mgmt.inq_stats()
54        request['count'] = 40
55        resp = dce.request(request)
56        resp.dump()
57
58    def test_hinq_stats(self):
59        dce, transport = self.connect()
60
61        resp = mgmt.hinq_stats(dce)
62        resp.dump()
63
64    def test_is_server_listening(self):
65        dce, transport = self.connect()
66
67        request = mgmt.is_server_listening()
68        resp = dce.request(request, checkError=False)
69        resp.dump()
70
71    def test_his_server_listening(self):
72        dce, transport = self.connect()
73
74        resp = mgmt.his_server_listening(dce)
75        resp.dump()
76
77    def test_stop_server_listening(self):
78        dce, transport = self.connect()
79
80        request = mgmt.stop_server_listening()
81        try:
82            resp = dce.request(request)
83            resp.dump()
84        except Exception, e:
85            if str(e).find('rpc_s_access_denied') < 0:
86                raise
87
88    def test_hstop_server_listening(self):
89        dce, transport = self.connect()
90
91        try:
92            resp = mgmt.hstop_server_listening(dce)
93            resp.dump()
94        except Exception, e:
95            if str(e).find('rpc_s_access_denied') < 0:
96                raise
97
98    def test_inq_princ_name(self):
99        dce, transport = self.connect()
100
101        request = mgmt.inq_princ_name()
102        request['authn_proto'] = 0
103        request['princ_name_size'] = 32
104        resp = dce.request(request, checkError=False)
105        resp.dump()
106
107    def test_his_server_listening(self):
108        dce, transport = self.connect()
109
110        resp = mgmt.hinq_princ_name(dce)
111        resp.dump()
112
113
114class SMBTransport(MGMTTests):
115    def setUp(self):
116        MGMTTests.setUp(self)
117        configFile = ConfigParser.ConfigParser()
118        configFile.read('dcetests.cfg')
119        self.username = configFile.get('SMBTransport', 'username')
120        self.domain   = configFile.get('SMBTransport', 'domain')
121        self.serverName = configFile.get('SMBTransport', 'servername')
122        self.password = configFile.get('SMBTransport', 'password')
123        self.machine  = configFile.get('SMBTransport', 'machine')
124        self.hashes   = configFile.get('SMBTransport', 'hashes')
125        self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine
126        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
127
128class TCPTransport(MGMTTests):
129    def setUp(self):
130        MGMTTests.setUp(self)
131        configFile = ConfigParser.ConfigParser()
132        configFile.read('dcetests.cfg')
133        self.username = configFile.get('TCPTransport', 'username')
134        self.domain   = configFile.get('TCPTransport', 'domain')
135        self.serverName = configFile.get('TCPTransport', 'servername')
136        self.password = configFile.get('TCPTransport', 'password')
137        self.machine  = configFile.get('TCPTransport', 'machine')
138        self.hashes   = configFile.get('TCPTransport', 'hashes')
139        self.stringBinding = r'ncacn_ip_tcp:%s[135]' % self.machine
140        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
141
142class SMBTransport64(MGMTTests):
143    def setUp(self):
144        MGMTTests.setUp(self)
145        configFile = ConfigParser.ConfigParser()
146        configFile.read('dcetests.cfg')
147        self.username = configFile.get('SMBTransport', 'username')
148        self.domain   = configFile.get('SMBTransport', 'domain')
149        self.serverName = configFile.get('SMBTransport', 'servername')
150        self.password = configFile.get('SMBTransport', 'password')
151        self.machine  = configFile.get('SMBTransport', 'machine')
152        self.hashes   = configFile.get('SMBTransport', 'hashes')
153        self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine
154        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
155
156class TCPTransport64(MGMTTests):
157    def setUp(self):
158        MGMTTests.setUp(self)
159        configFile = ConfigParser.ConfigParser()
160        configFile.read('dcetests.cfg')
161        self.username = configFile.get('TCPTransport', 'username')
162        self.domain   = configFile.get('TCPTransport', 'domain')
163        self.serverName = configFile.get('TCPTransport', 'servername')
164        self.password = configFile.get('TCPTransport', 'password')
165        self.machine  = configFile.get('TCPTransport', 'machine')
166        self.hashes   = configFile.get('TCPTransport', 'hashes')
167        self.stringBinding = r'ncacn_ip_tcp:%s[135]' % self.machine
168        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
169
170
171# Process command-line arguments.
172if __name__ == '__main__':
173    import sys
174    if len(sys.argv) > 1:
175        testcase = sys.argv[1]
176        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
177    else:
178        #suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport64)
179        suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport)
180        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport))
181        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64))
182        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64))
183    unittest.TextTestRunner(verbosity=1).run(suite)
184