1import unittest
2import os
3import socket
4import select
5import errno
6
7import ConfigParser
8from binascii import unhexlify
9from impacket.smbconnection import SMBConnection, smb
10from impacket.smb3structs import *
11from impacket import nt_errors, nmb
12
13# IMPORTANT NOTE:
14# For some reason, under Windows 8, you cannot switch between
15# dialects 002, 2_1 and 3_0 (it will throw STATUS_USER_SESSION_DELETED),
16# but you can with SMB1.
17# So, you can't run all test cases against the same machine.
18# Usually running all the tests against a Windows 7 except SMB3
19# would do the trick.
20# ToDo:
21# [ ] Add the rest of SMBConnection public methods
22
23class SMBTests(unittest.TestCase):
24    def create_connection(self):
25        if self.dialects == smb.SMB_DIALECT:
26            # Only for SMB1 let's do manualNego
27            s = SMBConnection(self.serverName, self.machine, preferredDialect = self.dialects, sess_port = self.sessPort, manualNegotiate=True)
28            s.negotiateSession(self.dialects, flags2=self.flags2)
29        else:
30            s = SMBConnection(self.serverName, self.machine, preferredDialect = self.dialects, sess_port = self.sessPort)
31        return s
32
33    def test_aliasconnection(self):
34        smb = SMBConnection('*SMBSERVER', self.machine, preferredDialect=self.dialects, sess_port=self.sessPort)
35        smb.login(self.username, self.password, self.domain)
36        smb.listPath(self.share, '*')
37        smb.logoff()
38
39    def test_reconnect(self):
40        smb = self.create_connection()
41        smb.login(self.username, self.password, self.domain)
42        smb.listPath(self.share, '*')
43        smb.logoff()
44        smb.reconnect()
45        smb.listPath(self.share, '*')
46        smb.logoff()
47
48    def test_reconnectKerberosHashes(self):
49        lmhash, nthash = self.hashes.split(':')
50        smb = self.create_connection()
51        smb.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '')
52        credentials = smb.getCredentials()
53        self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) )
54        UNC = '\\\\%s\\%s' % (self.machine, self.share)
55        tid = smb.connectTree(UNC)
56        smb.logoff()
57        smb.reconnect()
58        credentials = smb.getCredentials()
59        self.assertTrue(
60            credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None))
61        UNC = '\\\\%s\\%s' % (self.machine, self.share)
62        tid = smb.connectTree(UNC)
63        smb.logoff()
64
65    def test_connectTree(self):
66        smb = self.create_connection()
67        smb.login(self.username, self.password, self.domain)
68        tid = smb.connectTree(self.share)
69        UNC = '\\\\%s\\%s' % (self.machine, self.share)
70        tid = smb.connectTree(UNC)
71
72    def test_connection(self):
73        smb = self.create_connection()
74        smb.login(self.username, self.password, self.domain)
75        credentials = smb.getCredentials()
76        self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None))
77        smb.logoff()
78        del(smb)
79
80    def test_close_connection(self):
81        smb = self.create_connection()
82        smb.login(self.username, self.password, self.domain)
83        smb_connection_socket = smb.getSMBServer().get_socket()
84        self.assertTrue(self.__is_socket_opened(smb_connection_socket) == True)
85        smb.close()
86        self.assertTrue(self.__is_socket_opened(smb_connection_socket) == False)
87        del(smb)
88
89    def test_manualNego(self):
90        smb = self.create_connection()
91        smb.negotiateSession(self.dialects)
92        smb.login(self.username, self.password, self.domain)
93        credentials = smb.getCredentials()
94        self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None))
95        smb.logoff()
96        del(smb)
97
98    def test_loginHashes(self):
99        lmhash, nthash = self.hashes.split(':')
100        smb = self.create_connection()
101        smb.login(self.username, '', self.domain, lmhash, nthash)
102        credentials = smb.getCredentials()
103        self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) )
104        smb.logoff()
105
106    def test_loginKerberosHashes(self):
107        lmhash, nthash = self.hashes.split(':')
108        smb = self.create_connection()
109        smb.kerberosLogin(self.username, '', self.domain, lmhash, nthash, '')
110        credentials = smb.getCredentials()
111        self.assertTrue( credentials == (self.username, '', self.domain, unhexlify(lmhash), unhexlify(nthash), '', None, None) )
112        UNC = '\\\\%s\\%s' % (self.machine, self.share)
113        tid = smb.connectTree(UNC)
114        smb.logoff()
115
116    def test_loginKerberos(self):
117        smb = self.create_connection()
118        smb.kerberosLogin(self.username, self.password, self.domain, '', '', '')
119        credentials = smb.getCredentials()
120        self.assertTrue( credentials == (self.username, self.password, self.domain, '','','', None, None) )
121        UNC = '\\\\%s\\%s' % (self.machine, self.share)
122        tid = smb.connectTree(UNC)
123        smb.logoff()
124
125    def test_loginKerberosAES(self):
126        smb = self.create_connection()
127        smb.kerberosLogin(self.username, '', self.domain, '', '', self.aesKey)
128        credentials = smb.getCredentials()
129        self.assertTrue( credentials == (self.username, '', self.domain, '','',self.aesKey, None, None) )
130        UNC = '\\\\%s\\%s' % (self.machine, self.share)
131        tid = smb.connectTree(UNC)
132        smb.logoff()
133
134    def test_listPath(self):
135        smb = self.create_connection()
136        smb.login(self.username, self.password, self.domain)
137        smb.listPath(self.share, '*')
138        smb.logoff()
139
140    def test_createFile(self):
141        smb = self.create_connection()
142        smb.login(self.username, self.password, self.domain)
143        tid = smb.connectTree(self.share)
144        fid = smb.createFile(tid, self.file)
145        smb.closeFile(tid,fid)
146        smb.rename(self.share, self.file, self.file + '.bak')
147        smb.deleteFile(self.share, self.file + '.bak')
148        smb.disconnectTree(tid)
149        smb.logoff()
150
151    def test_readwriteFile(self):
152        smb = self.create_connection()
153        smb.login(self.username, self.password, self.domain)
154        tid = smb.connectTree(self.share)
155        fid = smb.createFile(tid, self.file)
156        smb.writeFile(tid, fid, "A"*65535)
157        finished = False
158        data = ''
159        offset = 0
160        remaining = 65535
161        while remaining>0:
162            data += smb.readFile(tid,fid, offset, remaining)
163            remaining = 65535 - len(data)
164        self.assertTrue(len(data) == 65535)
165        self.assertTrue(data == "A"*65535)
166        smb.closeFile(tid,fid)
167        fid = smb.openFile(tid, self.file)
168        smb.closeFile(tid, fid)
169        smb.deleteFile(self.share, self.file)
170        smb.disconnectTree(tid)
171
172        smb.logoff()
173
174    def test_createdeleteDirectory(self):
175        smb = self.create_connection()
176        smb.login(self.username, self.password, self.domain)
177        smb.createDirectory(self.share, self.directory)
178        smb.deleteDirectory(self.share, self.directory)
179        smb.createDirectory(self.share, self.directory)
180        nested_dir = "%s\\%s" %(self.directory, self.directory)
181        smb.createDirectory(self.share, nested_dir)
182        try:
183            smb.deleteDirectory(self.share, self.directory)
184        except Exception as e:
185            if e.error == nt_errors.STATUS_DIRECTORY_NOT_EMPTY:
186                smb.deleteDirectory(self.share, nested_dir)
187                smb.deleteDirectory(self.share, self.directory)
188        smb.logoff()
189
190    def test_getData(self):
191        smb = self.create_connection()
192        smb.login(self.username, self.password, self.domain)
193        smb.getDialect()
194        smb.getServerName()
195        smb.getRemoteHost()
196        smb.getServerDomain()
197        smb.getServerOS()
198        smb.doesSupportNTLMv2()
199        smb.isLoginRequired()
200        smb.logoff()
201
202    def test_getServerName(self):
203        smb = self.create_connection()
204        smb.login(self.username, self.password, self.domain)
205        serverName = smb.getServerName()
206        self.assertTrue( serverName.upper() == self.serverName.upper() )
207        smb.logoff()
208
209    def test_getServerDNSDomainName(self):
210        smb = self.create_connection()
211        smb.login(self.username, self.password, self.domain)
212        serverDomain = smb.getServerDNSDomainName()
213        self.assertTrue( serverDomain.upper() == self.domain.upper())
214        smb.logoff()
215
216    def test_getServerDomain(self):
217        smb = self.create_connection()
218        smb.login(self.username, self.password, self.domain)
219        serverDomain = smb.getServerDomain()
220        self.assertTrue( serverDomain.upper() == self.domain.upper().split('.')[0])
221        smb.logoff()
222
223    def test_getRemoteHost(self):
224        smb = self.create_connection()
225        smb.login(self.username, self.password, self.domain)
226        remoteHost = smb.getRemoteHost()
227        self.assertTrue( remoteHost == self.machine)
228        smb.logoff()
229
230    def test_getDialect(self):
231        smb = self.create_connection()
232        smb.login(self.username, self.password, self.domain)
233        dialect = smb.getDialect()
234        self.assertTrue( dialect == self.dialects)
235        smb.logoff()
236
237    def test_uploadDownload(self):
238        smb = self.create_connection()
239        smb.login(self.username, self.password, self.domain)
240        f = open(self.upload)
241        smb.putFile(self.share, self.file, f.read)
242        f.close()
243        f = open(self.upload + '2', 'w+')
244        smb.getFile(self.share, self.file, f.write)
245        f.close()
246        os.unlink(self.upload + '2')
247        smb.deleteFile(self.share, self.file)
248        smb.logoff()
249
250    def test_listShares(self):
251        smb = self.create_connection()
252        smb.login(self.username, self.password, self.domain)
253        smb.listShares()
254        smb.logoff()
255
256    def test_getSessionKey(self):
257        smb = self.create_connection()
258        smb.login(self.username, self.password, self.domain)
259        smb.getSessionKey()
260        smb.logoff
261
262    def __is_socket_opened(self, s):
263        # We assume that if socket is selectable, it's open; and if it were not, it's closed.
264        # Note: this method is accurate as long as the file descriptor used for the socket is not re-used
265        is_socket_opened = True
266        try:
267            select.select([s], [], [], 0)
268        except socket.error, e:
269            if e[0] == errno.EBADF:
270                is_socket_opened = False
271        return is_socket_opened
272
273class SMB1Tests(SMBTests):
274    def setUp(self):
275        SMBTests.setUp(self)
276        # Put specific configuration for target machine with SMB1
277        configFile = ConfigParser.ConfigParser()
278        configFile.read('dcetests.cfg')
279        self.username = configFile.get('SMBTransport', 'username')
280        self.domain   = configFile.get('SMBTransport', 'domain')
281        self.serverName = configFile.get('SMBTransport', 'servername')
282        self.password = configFile.get('SMBTransport', 'password')
283        self.machine  = configFile.get('SMBTransport', 'machine')
284        self.hashes   = configFile.get('SMBTransport', 'hashes')
285        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
286        self.share    = 'C$'
287        self.file     = '/TEST'
288        self.directory= '/BETO'
289        self.upload   = '../../impacket/nt_errors.py'
290        self.flags2   = smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES
291        self.dialects = smb.SMB_DIALECT
292        self.sessPort = nmb.SMB_SESSION_PORT
293
294class SMB1TestsNetBIOS(SMBTests):
295    def setUp(self):
296        SMBTests.setUp(self)
297        # Put specific configuration for target machine with SMB1
298        configFile = ConfigParser.ConfigParser()
299        configFile.read('dcetests.cfg')
300        self.username = configFile.get('SMBTransport', 'username')
301        self.domain   = configFile.get('SMBTransport', 'domain')
302        self.serverName = configFile.get('SMBTransport', 'servername')
303        self.password = configFile.get('SMBTransport', 'password')
304        self.machine  = configFile.get('SMBTransport', 'machine')
305        self.hashes   = configFile.get('SMBTransport', 'hashes')
306        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
307        self.share    = 'C$'
308        self.file     = '/TEST'
309        self.directory= '/BETO'
310        self.upload   = '../../impacket/nt_errors.py'
311        self.flags2   = smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES
312        self.dialects = smb.SMB_DIALECT
313        self.sessPort = nmb.NETBIOS_SESSION_PORT
314
315class SMB1TestsUnicode(SMBTests):
316    def setUp(self):
317        SMBTests.setUp(self)
318        # Put specific configuration for target machine with SMB1
319        configFile = ConfigParser.ConfigParser()
320        configFile.read('dcetests.cfg')
321        self.username = configFile.get('SMBTransport', 'username')
322        self.domain   = configFile.get('SMBTransport', 'domain')
323        self.serverName = configFile.get('SMBTransport', 'servername')
324        self.password = configFile.get('SMBTransport', 'password')
325        self.machine  = configFile.get('SMBTransport', 'machine')
326        self.hashes   = configFile.get('SMBTransport', 'hashes')
327        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
328        self.share    = 'C$'
329        self.file     = '/TEST'
330        self.directory= '/BETO'
331        self.upload   = '../../impacket/nt_errors.py'
332        self.flags2   = smb.SMB.FLAGS2_UNICODE | smb.SMB.FLAGS2_NT_STATUS | smb.SMB.FLAGS2_EXTENDED_SECURITY | smb.SMB.FLAGS2_LONG_NAMES
333        self.dialects = smb.SMB_DIALECT
334        self.sessPort = nmb.SMB_SESSION_PORT
335
336class SMB002Tests(SMBTests):
337    def setUp(self):
338        # Put specific configuration for target machine with SMB_002
339        SMBTests.setUp(self)
340        configFile = ConfigParser.ConfigParser()
341        configFile.read('dcetests.cfg')
342        self.username = configFile.get('SMBTransport', 'username')
343        self.domain   = configFile.get('SMBTransport', 'domain')
344        self.serverName = configFile.get('SMBTransport', 'servername')
345        self.password = configFile.get('SMBTransport', 'password')
346        self.machine  = configFile.get('SMBTransport', 'machine')
347        self.hashes   = configFile.get('SMBTransport', 'hashes')
348        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
349        self.share    = 'C$'
350        self.file     = '/TEST'
351        self.directory= '/BETO'
352        self.upload   = '../../impacket/nt_errors.py'
353        self.dialects = SMB2_DIALECT_002
354        self.sessPort = nmb.SMB_SESSION_PORT
355
356class SMB21Tests(SMBTests):
357    def setUp(self):
358        # Put specific configuration for target machine with SMB 2.1
359        SMBTests.setUp(self)
360        configFile = ConfigParser.ConfigParser()
361        configFile.read('dcetests.cfg')
362        self.username = configFile.get('SMBTransport', 'username')
363        self.domain   = configFile.get('SMBTransport', 'domain')
364        self.serverName = configFile.get('SMBTransport', 'servername')
365        self.password = configFile.get('SMBTransport', 'password')
366        self.machine  = configFile.get('SMBTransport', 'machine')
367        self.hashes   = configFile.get('SMBTransport', 'hashes')
368        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
369        self.share    = 'C$'
370        self.file     = '/TEST'
371        self.directory= '/BETO'
372        self.upload   = '../../impacket/nt_errors.py'
373        self.dialects = SMB2_DIALECT_21
374        self.sessPort = nmb.SMB_SESSION_PORT
375
376class SMB3Tests(SMBTests):
377    def setUp(self):
378        # Put specific configuration for target machine with SMB3
379        SMBTests.setUp(self)
380        configFile = ConfigParser.ConfigParser()
381        configFile.read('dcetests.cfg')
382        self.username = configFile.get('SMBTransport', 'username')
383        self.domain   = configFile.get('SMBTransport', 'domain')
384        self.serverName = configFile.get('SMBTransport', 'servername')
385        self.password = configFile.get('SMBTransport', 'password')
386        self.machine  = configFile.get('SMBTransport', 'machine')
387        self.hashes   = configFile.get('SMBTransport', 'hashes')
388        self.aesKey   = configFile.get('SMBTransport', 'aesKey128')
389        self.share    = 'C$'
390        self.file     = '/TEST'
391        self.directory= '/BETO'
392        self.upload   = '../../impacket/nt_errors.py'
393        self.dialects = SMB2_DIALECT_30
394        self.sessPort = nmb.SMB_SESSION_PORT
395
396if __name__ == "__main__":
397    suite = unittest.TestLoader().loadTestsFromTestCase(SMB1Tests)
398    suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB1TestsNetBIOS))
399    suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB1TestsUnicode))
400    suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB002Tests))
401    suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB21Tests))
402    suite.addTests(unittest.TestLoader().loadTestsFromTestCase(SMB3Tests))
403    unittest.TextTestRunner(verbosity=1).run(suite)
404