1# Copyright (c) 2012-2016 Seafile Ltd.
2from importlib import import_module
3import os
4from uuid import uuid4
5
6from django.core.cache import cache
7from django.urls import reverse
8from django.conf import settings
9from django.http import SimpleCookie
10from django.test import RequestFactory
11from django.test import TestCase
12from django.test import override_settings
13from exam.decorators import fixture
14from exam.cases import Exam
15import seaserv
16from seaserv import seafile_api, ccnet_threaded_rpc, ccnet_api, create_org,\
17        seafserv_threaded_rpc
18
19from seahub.group.utils import is_group_member
20from seahub.utils import mkstemp
21from seahub.base.accounts import User
22from seahub.utils.file_size import get_file_size_unit
23from seahub.base.templatetags.seahub_tags import email2nickname,\
24        email2contact_email
25from seahub.share.models import ExtraSharePermission, ExtraGroupsSharePermission
26from seahub.role_permissions.models import AdminRole
27
28
29TRAVIS = 'TRAVIS' in os.environ
30
31
32class Fixtures(Exam):
33    user_password = 'secret'
34    admin_password = 'secret'
35    org_user_password = 'secret'
36    org_admin_password = 'secret'
37    org_user_name = uuid4().hex + '@org.com'
38    org_admin_name = uuid4().hex + '@org.com'
39    ip = '127.0.0.1'
40    ip_v6 = '2001:0db8:85a3:0000:0000:8a2e:0370:7334'
41
42    @fixture
43    def fake_request(self):
44        # Every test needs access to the request factory.
45        factory = RequestFactory()
46
47        # Create an instance of a GET request.
48        fake_request = factory.get('/foo/')
49        fake_request.user = self.user
50        fake_request.cloud_mode = False
51
52        return fake_request
53
54    @fixture
55    def user(self):
56        return self.create_user('test@test.com')
57
58    @fixture
59    def admin(self):
60        return self.create_user('admin@test.com', is_staff=True)
61
62    @fixture
63    def admin_cannot_view_system_info(self):
64        user = self.create_user('admin_no_permission@test.com', is_staff=True)
65        AdminRole.objects.add_admin_role(user.username, 'cannot_view_system_info')
66        return user
67
68    @fixture
69    def admin_cannot_view_statistic(self):
70        user = self.create_user('admin_no_permission@test.com', is_staff=True)
71        AdminRole.objects.add_admin_role(user.username, 'cannot_view_statistic')
72        return user
73
74    @fixture
75    def admin_cannot_config_system(self):
76        user = self.create_user('admin_no_permission@test.com', is_staff=True)
77        AdminRole.objects.add_admin_role(user.username, 'cannot_config_system')
78        return user
79
80    @fixture
81    def admin_cannot_manage_library(self):
82        user = self.create_user('admin_no_permission@test.com', is_staff=True)
83        AdminRole.objects.add_admin_role(user.username, 'cannot_manage_library')
84        return user
85
86    @fixture
87    def admin_cannot_manage_user(self):
88        user = self.create_user('admin_no_permission@test.com', is_staff=True)
89        AdminRole.objects.add_admin_role(user.username, 'cannot_manage_user')
90        return user
91
92    @fixture
93    def admin_cannot_manage_group(self):
94        user = self.create_user('admin_no_permission@test.com', is_staff=True)
95        AdminRole.objects.add_admin_role(user.username, 'cannot_manage_group')
96        return user
97
98    @fixture
99    def admin_cannot_view_user_log(self):
100        user = self.create_user('admin_no_permission@test.com', is_staff=True)
101        AdminRole.objects.add_admin_role(user.username, 'cannot_view_user_log')
102        return user
103
104    @fixture
105    def admin_cannot_view_admin_log(self):
106        user = self.create_user('admin_no_permission@test.com', is_staff=True)
107        AdminRole.objects.add_admin_role(user.username, 'cannot_view_admin_log')
108        return user
109
110    @fixture
111    def admin_no_other_permission(self):
112        user = self.create_user('admin_no_permission@test.com', is_staff=True)
113        AdminRole.objects.add_admin_role(user.username, 'no_other_permission')
114        return user
115
116    @fixture
117    def repo(self):
118        r = seafile_api.get_repo(self.create_repo(name='test-repo', desc='',
119                                                  username=self.user.username,
120                                                  passwd=None))
121        return r
122
123    @fixture
124    def enc_repo(self):
125        r = seafile_api.get_repo(self.create_repo(name='test-enc-repo', desc='',
126                                                  username=self.user.username,
127                                                  passwd='123'))
128
129        self.create_file(repo_id=r.id, parent_dir='/',
130                         filename='test.txt', username='test@test.com')
131
132        assert r is not None
133        return r
134
135    @fixture
136    def file(self):
137        return self.create_file(repo_id=self.repo.id,
138                                parent_dir='/',
139                                filename='test.txt',
140                                username='test@test.com')
141
142    @fixture
143    def folder(self):
144        return self.create_folder(repo_id=self.repo.id,
145                                  parent_dir='/',
146                                  dirname='folder',
147                                  username='test@test.com')
148
149    @fixture
150    def org_folder(self):
151        return self.create_folder(repo_id=self.org_repo.id,
152                                  parent_dir='/',
153                                  dirname='folder',
154                                  username=self.org_user.email)
155
156    @fixture
157    def group(self):
158        return self.create_group(group_name='test_group',
159                                 username=self.user.username)
160
161    @fixture
162    def org_group(self):
163        return self.create_org_group(group_name='test_org_group',
164                                     username=self.org_user.email)
165
166    @fixture
167    def org(self):
168        return self.create_org()
169
170    @fixture
171    def org_user(self):
172        return self.create_org_user()
173
174    @fixture
175    def org_admin(self):
176        return self.create_org_admin_user()
177
178    @fixture
179    def org_repo(self):
180        r = seafile_api.get_repo(self.create_org_repo(name='test-org-repo', desc='',
181                                                      username=self.org_user.email,
182                                                      passwd=None, org_id=self.org.org_id))
183        return r
184
185    def create_user(self, email=None, **kwargs):
186        if not email:
187            email = uuid4().hex + '@test.com'
188
189        kwargs.setdefault('email', email)
190        kwargs.setdefault('is_staff', False)
191        kwargs.setdefault('is_active', True)
192
193        return User.objects.create_user(password='secret', **kwargs)
194
195    def remove_user(self, email=None):
196        if not email:
197            email = self.user.username
198        try:
199            User.objects.get(email).delete()
200        except User.DoesNotExist:
201            pass
202        for g in ccnet_api.get_groups(email):
203            ccnet_threaded_rpc.remove_group(g.id, email)
204
205    def create_repo(self, **kwargs):
206        repo_id = seafile_api.create_repo(**kwargs)
207        return repo_id
208
209    def create_org_repo(self, **kwargs):
210        repo_id = seafile_api.create_org_repo(**kwargs)
211        return repo_id
212
213    def remove_repo(self, repo_id=None):
214        if not repo_id:
215            repo_id = self.repo.id
216        return seafile_api.remove_repo(repo_id)
217
218    def remove_org_repo(self, repo_id=None):
219        if not repo_id:
220            repo_id = self.org_repo.id
221        return seafile_api.remove_repo(repo_id)
222
223    def create_file(self, **kwargs):
224        seafile_api.post_empty_file(**kwargs)
225        return kwargs['parent_dir'] + kwargs['filename']
226
227    def create_file_with_content(self, file_name, parent_dir='/', content='junk content',
228                                 username=''):
229        seafile_api.post_empty_file(self.repo.id, parent_dir, file_name, username)
230
231        # first dump the file content to a tmp file, then update the file
232        fd, tmp_file = mkstemp()
233
234        try:
235            bytesWritten = os.write(fd, content.encode('utf-8'))
236        except:
237            bytesWritten = -1
238        finally:
239            os.close(fd)
240
241        assert bytesWritten > 0
242
243        seafile_api.put_file(self.repo.id, tmp_file, parent_dir, file_name,
244                             '', None)
245        return parent_dir + file_name
246
247    def create_folder(self, **kwargs):
248        seafile_api.post_dir(**kwargs)
249        return kwargs['parent_dir'] + kwargs['dirname']
250
251    def remove_folder(self):
252        seafile_api.del_file(self.repo.id, os.path.dirname(self.folder),
253                             os.path.basename(self.folder), self.user.username)
254
255    def create_group(self, **kwargs):
256        group_name = kwargs['group_name']
257        username = kwargs['username']
258        group_id = ccnet_threaded_rpc.create_group(group_name, username)
259        return ccnet_threaded_rpc.get_group(group_id)
260
261    def create_org_group(self, **kwargs):
262        group_name = kwargs['group_name']
263        username = kwargs['username']
264        org_group_id = ccnet_threaded_rpc.create_org_group(self.org.org_id, group_name, username)
265        org_groups = ccnet_threaded_rpc.get_org_groups(self.org.org_id, -1, -1)
266        res_group = None
267        for group in org_groups:
268            if group.id == org_group_id:
269                res_group = group
270        return res_group
271
272    def remove_group(self, group_id=None):
273        if not group_id:
274            group_id = self.group.id
275        return ccnet_threaded_rpc.remove_group(group_id, self.user.username)
276
277    def create_org(self, username=org_user_name, password=org_user_password, org_name='org', prefix='org_p', quota=100, member_limit=100):
278        new_org = ccnet_threaded_rpc.get_org_by_url_prefix(prefix)
279        if new_org:
280            return new_org
281
282        quota = int(quota)
283        User.objects.create_user(username, password, is_staff=False, is_active=True)
284        create_org(org_name, prefix, username)
285        new_org = ccnet_threaded_rpc.get_org_by_url_prefix(prefix)
286        from seahub_extra.organizations.models import OrgMemberQuota
287        OrgMemberQuota.objects.set_quota(new_org.org_id, member_limit)
288        quota = quota * get_file_size_unit('MB')
289        seafserv_threaded_rpc.set_org_quota(new_org.org_id, quota)
290
291        return new_org
292
293    def create_org_user(self, email=org_admin_name, password=org_user_password):
294        user = None
295        try:
296            user = User.objects.get(email=email)
297        except User.DoesNotExist:
298            user = User.objects.create_user(email, password, is_staff=False, is_active=True)
299        ccnet_api.add_org_user(self.org.org_id, email, 0)
300        return user
301
302
303    def create_org_admin_user(self, email="admin@org.com", password=org_admin_password):
304        admin = None
305        try:
306            admin = User.objects.get(email=email)
307        except User.DoesNotExist:
308            admin = User.objects.create_user(email, password, is_staff=False, is_active=True)
309        ccnet_api.add_org_user(self.org.org_id, email, 1)
310        return admin
311
312    def get_org_user_info(self, user, org):
313        user_info = {}
314        user_info['org_id'] = org.org_id
315        user_info['active'] = user.is_active
316        user_info['email'] = user.email
317        user_info['name'] = email2nickname(user.email)
318        user_info['contact_email'] = email2contact_email(user.email)
319
320        org_user_quota = seafile_api.get_org_user_quota(org.org_id, user.email)
321        user_info['quota_total'] = org_user_quota / get_file_size_unit('MB')
322
323        org_user_quota_usage = seafile_api.get_org_user_quota_usage(org.org_id, user.email)
324        user_info['quota_usage'] = org_user_quota_usage / get_file_size_unit('MB')
325
326        return user_info
327
328    def share_repo_to_admin_with_r_permission(self):
329        # share user's repo to admin with 'r' permission
330        seafile_api.share_repo(self.repo.id, self.user.username,
331                self.admin.username, 'r')
332
333    def share_repo_to_admin_with_rw_permission(self):
334        # share user's repo to admin with 'rw' permission
335        seafile_api.share_repo(self.repo.id, self.user.username,
336                self.admin.username, 'rw')
337
338    def share_repo_to_admin_with_admin_permission(self):
339        # share user's repo to admin with 'admin' permission
340        seafile_api.share_repo(self.repo.id, self.user.username,
341                self.admin.username, 'rw')
342        ExtraSharePermission.objects.create_share_permission(self.repo.id, self.admin.username, 'admin')
343
344    def share_org_repo_to_org_admin_with_rw_permission(self):
345        seaserv.seafserv_threaded_rpc.org_add_share(self.org.org_id, self.org_repo.repo_id,
346                                                    self.org_user.username, self.org_admin.username,
347                                                    'rw')
348
349    def share_org_repo_to_org_admin_with_r_permission(self):
350        seaserv.seafserv_threaded_rpc.org_add_share(self.org.org_id, self.org_repo.repo_id,
351                                                    self.org_user.username, self.org_admin.username,
352                                                    'r')
353
354    def set_user_folder_r_permission_to_admin(self):
355
356        # share user's repo to admin with 'rw' permission
357        seafile_api.share_repo(self.repo.id, self.user.username,
358                self.admin.username, 'rw')
359
360        # set user sub-folder 'r' permisson to admin
361        seafile_api.add_folder_user_perm(self.repo.id,
362                self.folder, 'r', self.admin.username)
363
364        # admin can visit user sub-folder with 'r' permission
365        assert seafile_api.check_permission_by_path(self.repo.id,
366                self.folder, self.admin.username) == 'r'
367
368    def set_user_folder_rw_permission_to_admin(self):
369
370        # share user's repo to admin with 'r' permission
371        seafile_api.share_repo(self.repo.id, self.user.username,
372                self.admin.username, 'r')
373
374        # set user sub-folder 'rw' permisson to admin
375        seafile_api.add_folder_user_perm(self.repo.id,
376                self.folder, 'rw', self.admin.username)
377
378        # admin can visit user sub-folder with 'rw' permission
379        assert seafile_api.check_permission_by_path(self.repo.id,
380                self.folder, self.admin.username) == 'rw'
381
382    def share_repo_to_group_with_r_permission(self):
383        seafile_api.set_group_repo(
384                self.repo.id, self.group.id, self.user.username, 'r')
385
386    def share_repo_to_group_with_rw_permission(self):
387        seafile_api.set_group_repo(
388                self.repo.id, self.group.id, self.user.username, 'rw')
389
390    def share_repo_to_group_with_admin_permission(self):
391        seafile_api.set_group_repo(
392                self.repo.id, self.group.id, self.user.username, 'rw')
393        ExtraGroupsSharePermission.objects.create_share_permission(self.repo.id, self.group.id, 'admin')
394
395    def share_org_repo_to_org_group_with_r_permission(self):
396        seafile_api.add_org_group_repo(self.org_repo.repo_id, self.org.org_id,
397                                       self.org_group.id, self.org_user.username, 'r')
398
399    def share_org_repo_to_org_group_with_rw_permission(self):
400        seafile_api.add_org_group_repo(self.org_repo.repo_id, self.org.org_id,
401                                       self.org_group.id, self.org_user.username, 'rw')
402
403    def add_admin_to_group(self):
404        ccnet_api.group_add_member(
405                self.group.id, self.user.username, self.admin.username)
406
407        assert is_group_member(self.group.id, self.admin.username)
408
409
410class BaseTestCase(TestCase, Fixtures):
411    def tearDown(self):
412        self.remove_repo(self.repo.id)
413        self.remove_repo(self.enc_repo.id)
414
415    def login_as(self, user, password=None):
416        if isinstance(user, str):
417            login = user
418        elif isinstance(user, User):
419            login = user.username
420        else:
421            assert False
422
423        password = password if password else self.user_password
424
425        return self.client.post(
426            reverse('auth_login'), {'login': login,
427                                    'password': password}
428        )
429
430    def logout(self):
431        """
432        Removes the authenticated user's cookies and session object.
433
434        Causes the authenticated user to be logged out.
435        """
436        session = import_module(settings.SESSION_ENGINE).SessionStore()
437        session_cookie = self.client.cookies.get(settings.SESSION_COOKIE_NAME)
438
439        if session_cookie:
440            session.delete(session_key=session_cookie.value)
441        self.client.cookies = SimpleCookie()
442
443    def clear_cache(self):
444        # clear cache between every test case to avoid config option cache
445        # issue which cause test failed
446        cache.clear()
447