1# Copyright (c) 2012-2016 Seafile Ltd. 2from importlib import import_module 3import os 4from uuid import uuid4 5 6from django.core.cache import cache 7from django.urls import reverse 8from django.conf import settings 9from django.http import SimpleCookie 10from django.test import RequestFactory 11from django.test import TestCase 12from django.test import override_settings 13from exam.decorators import fixture 14from exam.cases import Exam 15import seaserv 16from seaserv import seafile_api, ccnet_threaded_rpc, ccnet_api, create_org,\ 17 seafserv_threaded_rpc 18 19from seahub.group.utils import is_group_member 20from seahub.utils import mkstemp 21from seahub.base.accounts import User 22from seahub.utils.file_size import get_file_size_unit 23from seahub.base.templatetags.seahub_tags import email2nickname,\ 24 email2contact_email 25from seahub.share.models import ExtraSharePermission, ExtraGroupsSharePermission 26from seahub.role_permissions.models import AdminRole 27 28 29TRAVIS = 'TRAVIS' in os.environ 30 31 32class Fixtures(Exam): 33 user_password = 'secret' 34 admin_password = 'secret' 35 org_user_password = 'secret' 36 org_admin_password = 'secret' 37 org_user_name = uuid4().hex + '@org.com' 38 org_admin_name = uuid4().hex + '@org.com' 39 ip = '127.0.0.1' 40 ip_v6 = '2001:0db8:85a3:0000:0000:8a2e:0370:7334' 41 42 @fixture 43 def fake_request(self): 44 # Every test needs access to the request factory. 45 factory = RequestFactory() 46 47 # Create an instance of a GET request. 48 fake_request = factory.get('/foo/') 49 fake_request.user = self.user 50 fake_request.cloud_mode = False 51 52 return fake_request 53 54 @fixture 55 def user(self): 56 return self.create_user('test@test.com') 57 58 @fixture 59 def admin(self): 60 return self.create_user('admin@test.com', is_staff=True) 61 62 @fixture 63 def admin_cannot_view_system_info(self): 64 user = self.create_user('admin_no_permission@test.com', is_staff=True) 65 AdminRole.objects.add_admin_role(user.username, 'cannot_view_system_info') 66 return user 67 68 @fixture 69 def admin_cannot_view_statistic(self): 70 user = self.create_user('admin_no_permission@test.com', is_staff=True) 71 AdminRole.objects.add_admin_role(user.username, 'cannot_view_statistic') 72 return user 73 74 @fixture 75 def admin_cannot_config_system(self): 76 user = self.create_user('admin_no_permission@test.com', is_staff=True) 77 AdminRole.objects.add_admin_role(user.username, 'cannot_config_system') 78 return user 79 80 @fixture 81 def admin_cannot_manage_library(self): 82 user = self.create_user('admin_no_permission@test.com', is_staff=True) 83 AdminRole.objects.add_admin_role(user.username, 'cannot_manage_library') 84 return user 85 86 @fixture 87 def admin_cannot_manage_user(self): 88 user = self.create_user('admin_no_permission@test.com', is_staff=True) 89 AdminRole.objects.add_admin_role(user.username, 'cannot_manage_user') 90 return user 91 92 @fixture 93 def admin_cannot_manage_group(self): 94 user = self.create_user('admin_no_permission@test.com', is_staff=True) 95 AdminRole.objects.add_admin_role(user.username, 'cannot_manage_group') 96 return user 97 98 @fixture 99 def admin_cannot_view_user_log(self): 100 user = self.create_user('admin_no_permission@test.com', is_staff=True) 101 AdminRole.objects.add_admin_role(user.username, 'cannot_view_user_log') 102 return user 103 104 @fixture 105 def admin_cannot_view_admin_log(self): 106 user = self.create_user('admin_no_permission@test.com', is_staff=True) 107 AdminRole.objects.add_admin_role(user.username, 'cannot_view_admin_log') 108 return user 109 110 @fixture 111 def admin_no_other_permission(self): 112 user = self.create_user('admin_no_permission@test.com', is_staff=True) 113 AdminRole.objects.add_admin_role(user.username, 'no_other_permission') 114 return user 115 116 @fixture 117 def repo(self): 118 r = seafile_api.get_repo(self.create_repo(name='test-repo', desc='', 119 username=self.user.username, 120 passwd=None)) 121 return r 122 123 @fixture 124 def enc_repo(self): 125 r = seafile_api.get_repo(self.create_repo(name='test-enc-repo', desc='', 126 username=self.user.username, 127 passwd='123')) 128 129 self.create_file(repo_id=r.id, parent_dir='/', 130 filename='test.txt', username='test@test.com') 131 132 assert r is not None 133 return r 134 135 @fixture 136 def file(self): 137 return self.create_file(repo_id=self.repo.id, 138 parent_dir='/', 139 filename='test.txt', 140 username='test@test.com') 141 142 @fixture 143 def folder(self): 144 return self.create_folder(repo_id=self.repo.id, 145 parent_dir='/', 146 dirname='folder', 147 username='test@test.com') 148 149 @fixture 150 def org_folder(self): 151 return self.create_folder(repo_id=self.org_repo.id, 152 parent_dir='/', 153 dirname='folder', 154 username=self.org_user.email) 155 156 @fixture 157 def group(self): 158 return self.create_group(group_name='test_group', 159 username=self.user.username) 160 161 @fixture 162 def org_group(self): 163 return self.create_org_group(group_name='test_org_group', 164 username=self.org_user.email) 165 166 @fixture 167 def org(self): 168 return self.create_org() 169 170 @fixture 171 def org_user(self): 172 return self.create_org_user() 173 174 @fixture 175 def org_admin(self): 176 return self.create_org_admin_user() 177 178 @fixture 179 def org_repo(self): 180 r = seafile_api.get_repo(self.create_org_repo(name='test-org-repo', desc='', 181 username=self.org_user.email, 182 passwd=None, org_id=self.org.org_id)) 183 return r 184 185 def create_user(self, email=None, **kwargs): 186 if not email: 187 email = uuid4().hex + '@test.com' 188 189 kwargs.setdefault('email', email) 190 kwargs.setdefault('is_staff', False) 191 kwargs.setdefault('is_active', True) 192 193 return User.objects.create_user(password='secret', **kwargs) 194 195 def remove_user(self, email=None): 196 if not email: 197 email = self.user.username 198 try: 199 User.objects.get(email).delete() 200 except User.DoesNotExist: 201 pass 202 for g in ccnet_api.get_groups(email): 203 ccnet_threaded_rpc.remove_group(g.id, email) 204 205 def create_repo(self, **kwargs): 206 repo_id = seafile_api.create_repo(**kwargs) 207 return repo_id 208 209 def create_org_repo(self, **kwargs): 210 repo_id = seafile_api.create_org_repo(**kwargs) 211 return repo_id 212 213 def remove_repo(self, repo_id=None): 214 if not repo_id: 215 repo_id = self.repo.id 216 return seafile_api.remove_repo(repo_id) 217 218 def remove_org_repo(self, repo_id=None): 219 if not repo_id: 220 repo_id = self.org_repo.id 221 return seafile_api.remove_repo(repo_id) 222 223 def create_file(self, **kwargs): 224 seafile_api.post_empty_file(**kwargs) 225 return kwargs['parent_dir'] + kwargs['filename'] 226 227 def create_file_with_content(self, file_name, parent_dir='/', content='junk content', 228 username=''): 229 seafile_api.post_empty_file(self.repo.id, parent_dir, file_name, username) 230 231 # first dump the file content to a tmp file, then update the file 232 fd, tmp_file = mkstemp() 233 234 try: 235 bytesWritten = os.write(fd, content.encode('utf-8')) 236 except: 237 bytesWritten = -1 238 finally: 239 os.close(fd) 240 241 assert bytesWritten > 0 242 243 seafile_api.put_file(self.repo.id, tmp_file, parent_dir, file_name, 244 '', None) 245 return parent_dir + file_name 246 247 def create_folder(self, **kwargs): 248 seafile_api.post_dir(**kwargs) 249 return kwargs['parent_dir'] + kwargs['dirname'] 250 251 def remove_folder(self): 252 seafile_api.del_file(self.repo.id, os.path.dirname(self.folder), 253 os.path.basename(self.folder), self.user.username) 254 255 def create_group(self, **kwargs): 256 group_name = kwargs['group_name'] 257 username = kwargs['username'] 258 group_id = ccnet_threaded_rpc.create_group(group_name, username) 259 return ccnet_threaded_rpc.get_group(group_id) 260 261 def create_org_group(self, **kwargs): 262 group_name = kwargs['group_name'] 263 username = kwargs['username'] 264 org_group_id = ccnet_threaded_rpc.create_org_group(self.org.org_id, group_name, username) 265 org_groups = ccnet_threaded_rpc.get_org_groups(self.org.org_id, -1, -1) 266 res_group = None 267 for group in org_groups: 268 if group.id == org_group_id: 269 res_group = group 270 return res_group 271 272 def remove_group(self, group_id=None): 273 if not group_id: 274 group_id = self.group.id 275 return ccnet_threaded_rpc.remove_group(group_id, self.user.username) 276 277 def create_org(self, username=org_user_name, password=org_user_password, org_name='org', prefix='org_p', quota=100, member_limit=100): 278 new_org = ccnet_threaded_rpc.get_org_by_url_prefix(prefix) 279 if new_org: 280 return new_org 281 282 quota = int(quota) 283 User.objects.create_user(username, password, is_staff=False, is_active=True) 284 create_org(org_name, prefix, username) 285 new_org = ccnet_threaded_rpc.get_org_by_url_prefix(prefix) 286 from seahub_extra.organizations.models import OrgMemberQuota 287 OrgMemberQuota.objects.set_quota(new_org.org_id, member_limit) 288 quota = quota * get_file_size_unit('MB') 289 seafserv_threaded_rpc.set_org_quota(new_org.org_id, quota) 290 291 return new_org 292 293 def create_org_user(self, email=org_admin_name, password=org_user_password): 294 user = None 295 try: 296 user = User.objects.get(email=email) 297 except User.DoesNotExist: 298 user = User.objects.create_user(email, password, is_staff=False, is_active=True) 299 ccnet_api.add_org_user(self.org.org_id, email, 0) 300 return user 301 302 303 def create_org_admin_user(self, email="admin@org.com", password=org_admin_password): 304 admin = None 305 try: 306 admin = User.objects.get(email=email) 307 except User.DoesNotExist: 308 admin = User.objects.create_user(email, password, is_staff=False, is_active=True) 309 ccnet_api.add_org_user(self.org.org_id, email, 1) 310 return admin 311 312 def get_org_user_info(self, user, org): 313 user_info = {} 314 user_info['org_id'] = org.org_id 315 user_info['active'] = user.is_active 316 user_info['email'] = user.email 317 user_info['name'] = email2nickname(user.email) 318 user_info['contact_email'] = email2contact_email(user.email) 319 320 org_user_quota = seafile_api.get_org_user_quota(org.org_id, user.email) 321 user_info['quota_total'] = org_user_quota / get_file_size_unit('MB') 322 323 org_user_quota_usage = seafile_api.get_org_user_quota_usage(org.org_id, user.email) 324 user_info['quota_usage'] = org_user_quota_usage / get_file_size_unit('MB') 325 326 return user_info 327 328 def share_repo_to_admin_with_r_permission(self): 329 # share user's repo to admin with 'r' permission 330 seafile_api.share_repo(self.repo.id, self.user.username, 331 self.admin.username, 'r') 332 333 def share_repo_to_admin_with_rw_permission(self): 334 # share user's repo to admin with 'rw' permission 335 seafile_api.share_repo(self.repo.id, self.user.username, 336 self.admin.username, 'rw') 337 338 def share_repo_to_admin_with_admin_permission(self): 339 # share user's repo to admin with 'admin' permission 340 seafile_api.share_repo(self.repo.id, self.user.username, 341 self.admin.username, 'rw') 342 ExtraSharePermission.objects.create_share_permission(self.repo.id, self.admin.username, 'admin') 343 344 def share_org_repo_to_org_admin_with_rw_permission(self): 345 seaserv.seafserv_threaded_rpc.org_add_share(self.org.org_id, self.org_repo.repo_id, 346 self.org_user.username, self.org_admin.username, 347 'rw') 348 349 def share_org_repo_to_org_admin_with_r_permission(self): 350 seaserv.seafserv_threaded_rpc.org_add_share(self.org.org_id, self.org_repo.repo_id, 351 self.org_user.username, self.org_admin.username, 352 'r') 353 354 def set_user_folder_r_permission_to_admin(self): 355 356 # share user's repo to admin with 'rw' permission 357 seafile_api.share_repo(self.repo.id, self.user.username, 358 self.admin.username, 'rw') 359 360 # set user sub-folder 'r' permisson to admin 361 seafile_api.add_folder_user_perm(self.repo.id, 362 self.folder, 'r', self.admin.username) 363 364 # admin can visit user sub-folder with 'r' permission 365 assert seafile_api.check_permission_by_path(self.repo.id, 366 self.folder, self.admin.username) == 'r' 367 368 def set_user_folder_rw_permission_to_admin(self): 369 370 # share user's repo to admin with 'r' permission 371 seafile_api.share_repo(self.repo.id, self.user.username, 372 self.admin.username, 'r') 373 374 # set user sub-folder 'rw' permisson to admin 375 seafile_api.add_folder_user_perm(self.repo.id, 376 self.folder, 'rw', self.admin.username) 377 378 # admin can visit user sub-folder with 'rw' permission 379 assert seafile_api.check_permission_by_path(self.repo.id, 380 self.folder, self.admin.username) == 'rw' 381 382 def share_repo_to_group_with_r_permission(self): 383 seafile_api.set_group_repo( 384 self.repo.id, self.group.id, self.user.username, 'r') 385 386 def share_repo_to_group_with_rw_permission(self): 387 seafile_api.set_group_repo( 388 self.repo.id, self.group.id, self.user.username, 'rw') 389 390 def share_repo_to_group_with_admin_permission(self): 391 seafile_api.set_group_repo( 392 self.repo.id, self.group.id, self.user.username, 'rw') 393 ExtraGroupsSharePermission.objects.create_share_permission(self.repo.id, self.group.id, 'admin') 394 395 def share_org_repo_to_org_group_with_r_permission(self): 396 seafile_api.add_org_group_repo(self.org_repo.repo_id, self.org.org_id, 397 self.org_group.id, self.org_user.username, 'r') 398 399 def share_org_repo_to_org_group_with_rw_permission(self): 400 seafile_api.add_org_group_repo(self.org_repo.repo_id, self.org.org_id, 401 self.org_group.id, self.org_user.username, 'rw') 402 403 def add_admin_to_group(self): 404 ccnet_api.group_add_member( 405 self.group.id, self.user.username, self.admin.username) 406 407 assert is_group_member(self.group.id, self.admin.username) 408 409 410class BaseTestCase(TestCase, Fixtures): 411 def tearDown(self): 412 self.remove_repo(self.repo.id) 413 self.remove_repo(self.enc_repo.id) 414 415 def login_as(self, user, password=None): 416 if isinstance(user, str): 417 login = user 418 elif isinstance(user, User): 419 login = user.username 420 else: 421 assert False 422 423 password = password if password else self.user_password 424 425 return self.client.post( 426 reverse('auth_login'), {'login': login, 427 'password': password} 428 ) 429 430 def logout(self): 431 """ 432 Removes the authenticated user's cookies and session object. 433 434 Causes the authenticated user to be logged out. 435 """ 436 session = import_module(settings.SESSION_ENGINE).SessionStore() 437 session_cookie = self.client.cookies.get(settings.SESSION_COOKIE_NAME) 438 439 if session_cookie: 440 session.delete(session_key=session_cookie.value) 441 self.client.cookies = SimpleCookie() 442 443 def clear_cache(self): 444 # clear cache between every test case to avoid config option cache 445 # issue which cause test failed 446 cache.clear() 447