1import unittest 2import os 3import socket 4import select 5import errno 6 7import ConfigParser 8from binascii import unhexlify 9from impacket.smbconnection import SMBConnection, smb 10from impacket.smb3structs import * 11from impacket import nt_errors, nmb 12 13# IMPORTANT NOTE: 14# For some reason, under Windows 8, you cannot switch between 15# dialects 002, 2_1 and 3_0 (it will throw STATUS_USER_SESSION_DELETED), 16# but you can with SMB1. 17# So, you can't run all test cases against the same machine. 18# Usually running all the tests against a Windows 7 except SMB3 19# would do the trick. 20# ToDo: 21# [ ] Add the rest of SMBConnection public methods 22 23class SMBTests(unittest.TestCase): 24 def create_connection(self): 25 if self.dialects == smb.SMB_DIALECT: 26 # Only for SMB1 let's do manualNego 27 s = SMBConnection(self.serverName, self.machine, preferredDialect = self.dialects, sess_port = self.sessPort, manualNegotiate=True) 28 s.negotiateSession(self.dialects, flags2=self.flags2) 29 else: 30 s = SMBConnection(self.serverName, self.machine, preferredDialect = self.dialects, sess_port = self.sessPort) 31 return s 32 33 def test_aliasconnection(self): 34 smb = SMBConnection('*SMBSERVER', self.machine, preferredDialect=self.dialects, sess_port=self.sessPort) 35 smb.login(self.username, self.password, self.domain) 36 smb.listPath(self.share, '*') 37 smb.logoff() 38 39 def test_reconnect(self): 40 smb = self.create_connection() 41 smb.login(self.username, self.password, self.domain) 42 smb.listPath(self.share, '*') 43 smb.logoff() 44 smb.reconnect() 45 smb.listPath(self.share, '*') 46 smb.logoff() 47 48 def test_reconnectKerberosHashes(self): 49 lmhash, nthash = self.hashes.split(':') 50 smb = self.create_connection() 51 smb.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '') 52 credentials = smb.getCredentials() 53 self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) ) 54 UNC = '\\\\%s\\%s' % (self.machine, self.share) 55 tid = smb.connectTree(UNC) 56 smb.logoff() 57 smb.reconnect() 58 credentials = smb.getCredentials() 59 self.assertTrue( 60 credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None)) 61 UNC = '\\\\%s\\%s' % (self.machine, self.share) 62 tid = smb.connectTree(UNC) 63 smb.logoff() 64 65 def test_connectTree(self): 66 smb = self.create_connection() 67 smb.login(self.username, self.password, self.domain) 68 tid = smb.connectTree(self.share) 69 UNC = '\\\\%s\\%s' % (self.machine, self.share) 70 tid = smb.connectTree(UNC) 71 72 def test_connection(self): 73 smb = self.create_connection() 74 smb.login(self.username, self.password, self.domain) 75 credentials = smb.getCredentials() 76 self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None)) 77 smb.logoff() 78 del(smb) 79 80 def test_close_connection(self): 81 smb = self.create_connection() 82 smb.login(self.username, self.password, self.domain) 83 smb_connection_socket = smb.getSMBServer().get_socket() 84 self.assertTrue(self.__is_socket_opened(smb_connection_socket) == True) 85 smb.close() 86 self.assertTrue(self.__is_socket_opened(smb_connection_socket) == False) 87 del(smb) 88 89 def test_manualNego(self): 90 smb = self.create_connection() 91 smb.negotiateSession(self.dialects) 92 smb.login(self.username, self.password, self.domain) 93 credentials = smb.getCredentials() 94 self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None)) 95 smb.logoff() 96 del(smb) 97 98 def test_loginHashes(self): 99 lmhash, nthash = self.hashes.split(':') 100 smb = self.create_connection() 101 smb.login(self.username, '', self.domain, lmhash, nthash) 102 credentials = smb.getCredentials() 103 self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) ) 104 smb.logoff() 105 106 def test_loginKerberosHashes(self): 107 lmhash, nthash = self.hashes.split(':') 108 smb = self.create_connection() 109 smb.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '') 110 credentials = smb.getCredentials() 111 self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) ) 112 UNC = '\\\\%s\\%s' % (self.machine, self.share) 113 tid = smb.connectTree(UNC) 114 smb.logoff() 115 116 def test_loginKerberos(self): 117 smb = self.create_connection() 118 smb.kerberosLogin(self.username, self.password, self.domain, '', '', '') 119 credentials = smb.getCredentials() 120 self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None) ) 121 UNC = '\\\\%s\\%s' % (self.machine, self.share) 122 tid = smb.connectTree(UNC) 123 smb.logoff() 124 125 def test_loginKerberosAES(self): 126 smb = self.create_connection() 127 smb.kerberosLogin(self.username, '', self.domain, '', '', self.aesKey) 128 credentials = smb.getCredentials() 129 self.assertTrue( credentials == (self.username, '', self.domain, '','',self.aesKey, None, None) ) 130 UNC = '\\\\%s\\%s' % (self.machine, self.share) 131 tid = smb.connectTree(UNC) 132 smb.logoff() 133 134 def test_listPath(self): 135 smb = self.create_connection() 136 smb.login(self.username, self.password, self.domain) 137 smb.listPath(self.share, '*') 138 smb.logoff() 139 140 def test_createFile(self): 141 smb = self.create_connection() 142 smb.login(self.username, self.password, self.domain) 143 tid = smb.connectTree(self.share) 144 fid = smb.createFile(tid, self.file) 145 smb.closeFile(tid,fid) 146 smb.rename(self.share, self.file, self.file + '.bak') 147 smb.deleteFile(self.share, self.file + '.bak') 148 smb.disconnectTree(tid) 149 smb.logoff() 150 151 def test_readwriteFile(self): 152 smb = self.create_connection() 153 smb.login(self.username, self.password, self.domain) 154 tid = smb.connectTree(self.share) 155 fid = smb.createFile(tid, self.file) 156 smb.writeFile(tid, fid, "A"*65535) 157 finished = False 158 data = '' 159 offset = 0 160 remaining = 65535 161 while remaining>0: 162 data += smb.readFile(tid,fid, offset, remaining) 163 remaining = 65535 - len(data) 164 self.assertTrue(len(data) == 65535) 165 self.assertTrue(data == "A"*65535) 166 smb.closeFile(tid,fid) 167 fid = smb.openFile(tid, self.file) 168 smb.closeFile(tid, fid) 169 smb.deleteFile(self.share, self.file) 170 smb.disconnectTree(tid) 171 172 smb.logoff() 173 174 def test_createdeleteDirectory(self): 175 smb = self.create_connection() 176 smb.login(self.username, self.password, self.domain) 177 smb.createDirectory(self.share, self.directory) 178 smb.deleteDirectory(self.share, self.directory) 179 smb.createDirectory(self.share, self.directory) 180 nested_dir = "%s\\%s" %(self.directory, self.directory) 181 smb.createDirectory(self.share, nested_dir) 182 try: 183 smb.deleteDirectory(self.share, self.directory) 184 except Exception as e: 185 if e.error == nt_errors.STATUS_DIRECTORY_NOT_EMPTY: 186 smb.deleteDirectory(self.share, nested_dir) 187 smb.deleteDirectory(self.share, self.directory) 188 smb.logoff() 189 190 def test_getData(self): 191 smb = self.create_connection() 192 smb.login(self.username, self.password, self.domain) 193 smb.getDialect() 194 smb.getServerName() 195 smb.getRemoteHost() 196 smb.getServerDomain() 197 smb.getServerOS() 198 smb.doesSupportNTLMv2() 199 smb.isLoginRequired() 200 smb.logoff() 201 202 def test_getServerName(self): 203 smb = self.create_connection() 204 smb.login(self.username, self.password, self.domain) 205 serverName = smb.getServerName() 206 self.assertTrue( serverName.upper() == self.serverName.upper() ) 207 smb.logoff() 208 209 def test_getServerDNSDomainName(self): 210 smb = self.create_connection() 211 smb.login(self.username, self.password, self.domain) 212 serverDomain = smb.getServerDNSDomainName() 213 self.assertTrue( serverDomain.upper() == self.domain.upper()) 214 smb.logoff() 215 216 def test_getServerDomain(self): 217 smb = self.create_connection() 218 smb.login(self.username, self.password, self.domain) 219 serverDomain = smb.getServerDomain() 220 self.assertTrue( serverDomain.upper() == self.domain.upper().split('.')[0]) 221 smb.logoff() 222 223 def test_getRemoteHost(self): 224 smb = self.create_connection() 225 smb.login(self.username, self.password, self.domain) 226 remoteHost = smb.getRemoteHost() 227 self.assertTrue( remoteHost == self.machine) 228 smb.logoff() 229 230 def test_getDialect(self): 231 smb = self.create_connection() 232 smb.login(self.username, self.password, self.domain) 233 dialect = smb.getDialect() 234 self.assertTrue( dialect == self.dialects) 235 smb.logoff() 236 237 def test_uploadDownload(self): 238 smb = self.create_connection() 239 smb.login(self.username, self.password, self.domain) 240 f = open(self.upload) 241 smb.putFile(self.share, self.file, f.read) 242 f.close() 243 f = open(self.upload + '2', 'w+') 244 smb.getFile(self.share, self.file, f.write) 245 f.close() 246 os.unlink(self.upload + '2') 247 smb.deleteFile(self.share, self.file) 248 smb.logoff() 249 250 def test_listShares(self): 251 smb = self.create_connection() 252 smb.login(self.username, self.password, self.domain) 253 smb.listShares() 254 smb.logoff() 255 256 def test_getSessionKey(self): 257 smb = self.create_connection() 258 smb.login(self.username, self.password, self.domain) 259 smb.getSessionKey() 260 smb.logoff 261 262 def __is_socket_opened(self, s): 263 # We assume that if socket is selectable, it's open; and if it were not, it's closed. 264 # Note: this method is accurate as long as the file descriptor used for the socket is not re-used 265 is_socket_opened = True 266 try: 267 select.select([s], [], [], 0) 268 except socket.error, e: 269 if e[0] == errno.EBADF: 270 is_socket_opened = False 271 return is_socket_opened 272 273class SMB1Tests(SMBTests): 274 def setUp(self): 275 SMBTests.setUp(self) 276 # Put specific configuration for target machine with SMB1 277 configFile = ConfigParser.ConfigParser() 278 configFile.read('dcetests.cfg') 279 self.username = configFile.get('SMBTransport', 'username') 280 self.domain = configFile.get('SMBTransport', 'domain') 281 self.serverName = configFile.get('SMBTransport', 'servername') 282 self.password = configFile.get('SMBTransport', 'password') 283 self.machine = configFile.get('SMBTransport', 'machine') 284 self.hashes = configFile.get('SMBTransport', 'hashes') 285 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 286 self.share = 'C$' 287 self.file = '/TEST' 288 self.directory= '/BETO' 289 self.upload = '../../impacket/nt_errors.py' 290 self.flags2 = smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES 291 self.dialects = smb.SMB_DIALECT 292 self.sessPort = nmb.SMB_SESSION_PORT 293 294class SMB1TestsNetBIOS(SMBTests): 295 def setUp(self): 296 SMBTests.setUp(self) 297 # Put specific configuration for target machine with SMB1 298 configFile = ConfigParser.ConfigParser() 299 configFile.read('dcetests.cfg') 300 self.username = configFile.get('SMBTransport', 'username') 301 self.domain = configFile.get('SMBTransport', 'domain') 302 self.serverName = configFile.get('SMBTransport', 'servername') 303 self.password = configFile.get('SMBTransport', 'password') 304 self.machine = configFile.get('SMBTransport', 'machine') 305 self.hashes = configFile.get('SMBTransport', 'hashes') 306 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 307 self.share = 'C$' 308 self.file = '/TEST' 309 self.directory= '/BETO' 310 self.upload = '../../impacket/nt_errors.py' 311 self.flags2 = smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES 312 self.dialects = smb.SMB_DIALECT 313 self.sessPort = nmb.NETBIOS_SESSION_PORT 314 315class SMB1TestsUnicode(SMBTests): 316 def setUp(self): 317 SMBTests.setUp(self) 318 # Put specific configuration for target machine with SMB1 319 configFile = ConfigParser.ConfigParser() 320 configFile.read('dcetests.cfg') 321 self.username = configFile.get('SMBTransport', 'username') 322 self.domain = configFile.get('SMBTransport', 'domain') 323 self.serverName = configFile.get('SMBTransport', 'servername') 324 self.password = configFile.get('SMBTransport', 'password') 325 self.machine = configFile.get('SMBTransport', 'machine') 326 self.hashes = configFile.get('SMBTransport', 'hashes') 327 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 328 self.share = 'C$' 329 self.file = '/TEST' 330 self.directory= '/BETO' 331 self.upload = '../../impacket/nt_errors.py' 332 self.flags2 = smb.SMB.FLAGS2_UNICODE | smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES 333 self.dialects = smb.SMB_DIALECT 334 self.sessPort = nmb.SMB_SESSION_PORT 335 336class SMB002Tests(SMBTests): 337 def setUp(self): 338 # Put specific configuration for target machine with SMB_002 339 SMBTests.setUp(self) 340 configFile = ConfigParser.ConfigParser() 341 configFile.read('dcetests.cfg') 342 self.username = configFile.get('SMBTransport', 'username') 343 self.domain = configFile.get('SMBTransport', 'domain') 344 self.serverName = configFile.get('SMBTransport', 'servername') 345 self.password = configFile.get('SMBTransport', 'password') 346 self.machine = configFile.get('SMBTransport', 'machine') 347 self.hashes = configFile.get('SMBTransport', 'hashes') 348 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 349 self.share = 'C$' 350 self.file = '/TEST' 351 self.directory= '/BETO' 352 self.upload = '../../impacket/nt_errors.py' 353 self.dialects = SMB2_DIALECT_002 354 self.sessPort = nmb.SMB_SESSION_PORT 355 356class SMB21Tests(SMBTests): 357 def setUp(self): 358 # Put specific configuration for target machine with SMB 2.1 359 SMBTests.setUp(self) 360 configFile = ConfigParser.ConfigParser() 361 configFile.read('dcetests.cfg') 362 self.username = configFile.get('SMBTransport', 'username') 363 self.domain = configFile.get('SMBTransport', 'domain') 364 self.serverName = configFile.get('SMBTransport', 'servername') 365 self.password = configFile.get('SMBTransport', 'password') 366 self.machine = configFile.get('SMBTransport', 'machine') 367 self.hashes = configFile.get('SMBTransport', 'hashes') 368 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 369 self.share = 'C$' 370 self.file = '/TEST' 371 self.directory= '/BETO' 372 self.upload = '../../impacket/nt_errors.py' 373 self.dialects = SMB2_DIALECT_21 374 self.sessPort = nmb.SMB_SESSION_PORT 375 376class SMB3Tests(SMBTests): 377 def setUp(self): 378 # Put specific configuration for target machine with SMB3 379 SMBTests.setUp(self) 380 configFile = ConfigParser.ConfigParser() 381 configFile.read('dcetests.cfg') 382 self.username = configFile.get('SMBTransport', 'username') 383 self.domain = configFile.get('SMBTransport', 'domain') 384 self.serverName = configFile.get('SMBTransport', 'servername') 385 self.password = configFile.get('SMBTransport', 'password') 386 self.machine = configFile.get('SMBTransport', 'machine') 387 self.hashes = configFile.get('SMBTransport', 'hashes') 388 self.aesKey = configFile.get('SMBTransport', 'aesKey128') 389 self.share = 'C$' 390 self.file = '/TEST' 391 self.directory= '/BETO' 392 self.upload = '../../impacket/nt_errors.py' 393 self.dialects = SMB2_DIALECT_30 394 self.sessPort = nmb.SMB_SESSION_PORT 395 396if __name__ == "__main__": 397 suite = unittest.TestLoader().loadTestsFromTestCase(SMB1Tests) 398 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB1TestsNetBIOS)) 399 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB1TestsUnicode)) 400 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB002Tests)) 401 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB21Tests)) 402 suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB3Tests)) 403 unittest.TextTestRunner(verbosity=1).run(suite) 404