1# -*- coding: utf-8 -*-
2
3import os, tempfile, random, time
4from io import BytesIO
5from smb.SMBConnection import SMBConnection
6from .util import getConnectionInfo
7from nose.tools import with_setup
8from smb import smb_structs
9
10try:
11    import hashlib
12    def MD5(): return hashlib.md5()
13except ImportError:
14    import md5
15    def MD5(): return md5.new()
16
17conn = None
18
19TEST_FILENAME = os.path.join(os.path.dirname(__file__), os.pardir, 'SupportFiles', 'binary.dat')
20TEST_FILESIZE = 256000
21TEST_DIGEST = 'bb6303f76e29f354b6fdf6ef58587e48'
22
23def setup_func_SMB1():
24    global conn
25    smb_structs.SUPPORT_SMB2 = False
26
27    info = getConnectionInfo()
28    conn = SMBConnection(info['user'], info['password'], info['client_name'], info['server_name'], use_ntlm_v2 = True)
29    assert conn.connect(info['server_ip'], info['server_port'])
30
31def setup_func_SMB2():
32    global conn
33    smb_structs.SUPPORT_SMB2 = True
34
35    info = getConnectionInfo()
36    conn = SMBConnection(info['user'], info['password'], info['client_name'], info['server_name'], use_ntlm_v2 = True)
37    assert conn.connect(info['server_ip'], info['server_port'])
38
39def teardown_func():
40    global conn
41    conn.close()
42
43
44@with_setup(setup_func_SMB1, teardown_func)
45def test_store_long_filename_SMB1():
46    filename = os.sep + 'StoreTest %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
47
48    filesize = conn.storeFile('smbtest', filename, open(TEST_FILENAME, 'rb'))
49    assert filesize == TEST_FILESIZE
50
51    entries = conn.listPath('smbtest', os.path.dirname(filename.replace('/', os.sep)))
52    filenames = [e.filename for e in entries]
53    assert os.path.basename(filename.replace('/', os.sep)) in filenames
54
55    buf = BytesIO()
56    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
57    assert file_size == TEST_FILESIZE
58
59    md = MD5()
60    md.update(buf.getvalue())
61    assert md.hexdigest() == TEST_DIGEST
62    buf.close()
63
64    conn.deleteFiles('smbtest', filename)
65
66
67@with_setup(setup_func_SMB2, teardown_func)
68def test_store_long_filename_SMB2():
69    filename = os.sep + 'StoreTest %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
70
71    filesize = conn.storeFile('smbtest', filename, open(TEST_FILENAME, 'rb'))
72    assert filesize == TEST_FILESIZE
73
74    entries = conn.listPath('smbtest', os.path.dirname(filename.replace('/', os.sep)))
75    filenames = [e.filename for e in entries]
76    assert os.path.basename(filename.replace('/', os.sep)) in filenames
77
78    buf = BytesIO()
79    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
80    assert file_size == TEST_FILESIZE
81
82    md = MD5()
83    md.update(buf.getvalue())
84    assert md.hexdigest() == TEST_DIGEST
85    buf.close()
86
87    conn.deleteFiles('smbtest', filename)
88
89
90@with_setup(setup_func_SMB1, teardown_func)
91def test_store_unicode_filename_SMB1():
92    filename = os.sep + '上载测试 %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
93
94    filesize = conn.storeFile('smbtest', filename, open(TEST_FILENAME, 'rb'))
95    assert filesize == TEST_FILESIZE
96
97    entries = conn.listPath('smbtest', os.path.dirname(filename.replace('/', os.sep)))
98    filenames = [e.filename for e in entries]
99    assert os.path.basename(filename.replace('/', os.sep)) in filenames
100
101    buf = BytesIO()
102    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
103    assert file_size == TEST_FILESIZE
104
105    md = MD5()
106    md.update(buf.getvalue())
107    assert md.hexdigest() == TEST_DIGEST
108    buf.close()
109
110    conn.deleteFiles('smbtest', filename)
111
112
113@with_setup(setup_func_SMB1, teardown_func)
114def test_store_from_offset_SMB1():
115    filename = os.sep + 'StoreTest %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
116
117    buf = BytesIO(b'0123456789')
118    filesize = conn.storeFile('smbtest', filename, buf)
119    assert filesize == 10
120
121    buf = BytesIO(b'aa')
122    pos = conn.storeFileFromOffset('smbtest', filename, buf, 5)
123    assert pos == 7
124
125    buf = BytesIO()
126    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
127    assert file_size == 10
128    assert buf.getvalue() == b'01234aa789'
129    buf.close()
130
131    conn.deleteFiles('smbtest', filename)
132
133@with_setup(setup_func_SMB2, teardown_func)
134def test_store_unicode_filename_SMB2():
135    filename = os.sep + '上载测试 %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
136
137    filesize = conn.storeFile('smbtest', filename, open(TEST_FILENAME, 'rb'))
138    assert filesize == TEST_FILESIZE
139
140    entries = conn.listPath('smbtest', os.path.dirname(filename.replace('/', os.sep)))
141    filenames = [e.filename for e in entries]
142    assert os.path.basename(filename.replace('/', os.sep)) in filenames
143
144    buf = BytesIO()
145    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
146    assert file_size == TEST_FILESIZE
147
148    md = MD5()
149    md.update(buf.getvalue())
150    assert md.hexdigest() == TEST_DIGEST
151    buf.close()
152
153    conn.deleteFiles('smbtest', filename)
154
155@with_setup(setup_func_SMB2, teardown_func)
156def test_store_from_offset_SMB2():
157    filename = os.sep + 'StoreTest %d-%d.dat' % ( time.time(), random.randint(0, 10000) )
158
159    buf = BytesIO(b'0123456789')
160    filesize = conn.storeFile('smbtest', filename, buf)
161    assert filesize == 10
162
163    buf = BytesIO(b'aa')
164    pos = conn.storeFileFromOffset('smbtest', filename, buf, 5)
165    assert pos == 7
166
167    buf = BytesIO()
168    file_attributes, file_size = conn.retrieveFile('smbtest', filename, buf)
169    assert file_size == 10
170    assert buf.getvalue() == b'01234aa789'
171    buf.close()
172
173    conn.deleteFiles('smbtest', filename)
174