1############################################################################### 2# Tested so far: 3# 4# Not yet: 5# 6# Shouldn't dump errors against a win7 7# 8################################################################################ 9 10import unittest 11import ConfigParser 12 13from impacket.dcerpc.v5 import transport 14from impacket.dcerpc.v5 import mgmt 15 16 17class MGMTTests(unittest.TestCase): 18 def connect(self): 19 rpctransport = transport.DCERPCTransportFactory(self.stringBinding) 20 if len(self.hashes) > 0: 21 lmhash, nthash = self.hashes.split(':') 22 else: 23 lmhash = '' 24 nthash = '' 25 if hasattr(rpctransport, 'set_credentials'): 26 # This method exists only for selected protocol sequences. 27 rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash) 28 dce = rpctransport.get_dce_rpc() 29 dce.connect() 30 dce.bind(mgmt.MSRPC_UUID_MGMT, transfer_syntax = self.ts) 31 32 return dce, rpctransport 33 34 def test_inq_if_ids(self): 35 dce, transport = self.connect() 36 37 request = mgmt.inq_if_ids() 38 resp = dce.request(request) 39 resp.dump() 40 #for i in range(resp['if_id_vector']['count']): 41 # print bin_to_uuidtup(resp['if_id_vector']['if_id'][i]['Data'].getData()) 42 # print 43 44 def test_hinq_if_ids(self): 45 dce, transport = self.connect() 46 47 resp = mgmt.hinq_if_ids(dce) 48 resp.dump() 49 50 def test_inq_stats(self): 51 dce, transport = self.connect() 52 53 request = mgmt.inq_stats() 54 request['count'] = 40 55 resp = dce.request(request) 56 resp.dump() 57 58 def test_hinq_stats(self): 59 dce, transport = self.connect() 60 61 resp = mgmt.hinq_stats(dce) 62 resp.dump() 63 64 def test_is_server_listening(self): 65 dce, transport = self.connect() 66 67 request = mgmt.is_server_listening() 68 resp = dce.request(request, checkError=False) 69 resp.dump() 70 71 def test_his_server_listening(self): 72 dce, transport = self.connect() 73 74 resp = mgmt.his_server_listening(dce) 75 resp.dump() 76 77 def test_stop_server_listening(self): 78 dce, transport = self.connect() 79 80 request = mgmt.stop_server_listening() 81 try: 82 resp = dce.request(request) 83 resp.dump() 84 except Exception, e: 85 if str(e).find('rpc_s_access_denied') < 0: 86 raise 87 88 def test_hstop_server_listening(self): 89 dce, transport = self.connect() 90 91 try: 92 resp = mgmt.hstop_server_listening(dce) 93 resp.dump() 94 except Exception, e: 95 if str(e).find('rpc_s_access_denied') < 0: 96 raise 97 98 def test_inq_princ_name(self): 99 dce, transport = self.connect() 100 101 request = mgmt.inq_princ_name() 102 request['authn_proto'] = 0 103 request['princ_name_size'] = 32 104 resp = dce.request(request, checkError=False) 105 resp.dump() 106 107 def test_his_server_listening(self): 108 dce, transport = self.connect() 109 110 resp = mgmt.hinq_princ_name(dce) 111 resp.dump() 112 113 114class SMBTransport(MGMTTests): 115 def setUp(self): 116 MGMTTests.setUp(self) 117 configFile = ConfigParser.ConfigParser() 118 configFile.read('dcetests.cfg') 119 self.username = configFile.get('SMBTransport', 'username') 120 self.domain = configFile.get('SMBTransport', 'domain') 121 self.serverName = configFile.get('SMBTransport', 'servername') 122 self.password = configFile.get('SMBTransport', 'password') 123 self.machine = configFile.get('SMBTransport', 'machine') 124 self.hashes = configFile.get('SMBTransport', 'hashes') 125 self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine 126 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 127 128class TCPTransport(MGMTTests): 129 def setUp(self): 130 MGMTTests.setUp(self) 131 configFile = ConfigParser.ConfigParser() 132 configFile.read('dcetests.cfg') 133 self.username = configFile.get('TCPTransport', 'username') 134 self.domain = configFile.get('TCPTransport', 'domain') 135 self.serverName = configFile.get('TCPTransport', 'servername') 136 self.password = configFile.get('TCPTransport', 'password') 137 self.machine = configFile.get('TCPTransport', 'machine') 138 self.hashes = configFile.get('TCPTransport', 'hashes') 139 self.stringBinding = r'ncacn_ip_tcp:%s[135]' % self.machine 140 self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0') 141 142class SMBTransport64(MGMTTests): 143 def setUp(self): 144 MGMTTests.setUp(self) 145 configFile = ConfigParser.ConfigParser() 146 configFile.read('dcetests.cfg') 147 self.username = configFile.get('SMBTransport', 'username') 148 self.domain = configFile.get('SMBTransport', 'domain') 149 self.serverName = configFile.get('SMBTransport', 'servername') 150 self.password = configFile.get('SMBTransport', 'password') 151 self.machine = configFile.get('SMBTransport', 'machine') 152 self.hashes = configFile.get('SMBTransport', 'hashes') 153 self.stringBinding = r'ncacn_np:%s[\pipe\epmapper]' % self.machine 154 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 155 156class TCPTransport64(MGMTTests): 157 def setUp(self): 158 MGMTTests.setUp(self) 159 configFile = ConfigParser.ConfigParser() 160 configFile.read('dcetests.cfg') 161 self.username = configFile.get('TCPTransport', 'username') 162 self.domain = configFile.get('TCPTransport', 'domain') 163 self.serverName = configFile.get('TCPTransport', 'servername') 164 self.password = configFile.get('TCPTransport', 'password') 165 self.machine = configFile.get('TCPTransport', 'machine') 166 self.hashes = configFile.get('TCPTransport', 'hashes') 167 self.stringBinding = r'ncacn_ip_tcp:%s[135]' % self.machine 168 self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0') 169 170 171# Process command-line arguments. 172if __name__ == '__main__': 173 import sys 174 if len(sys.argv) > 1: 175 testcase = sys.argv[1] 176 suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase]) 177 else: 178 #suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport64) 179 suite = unittest.TestLoader().loadTestsFromTestCase(SMBTransport) 180 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport)) 181 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMBTransport64)) 182 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64)) 183 unittest.TextTestRunner(verbosity=1).run(suite) 184