1import os 2 3import logging 4import posixpath 5import stat 6from django.utils.http import urlquote 7from seaserv import seafile_api 8 9from seahub.base.models import UserStarredFiles 10from seahub.base.templatetags.seahub_tags import email2nickname, email2contact_email 11from seahub.settings import ENABLE_VIDEO_THUMBNAIL, THUMBNAIL_ROOT 12from seahub.thumbnail.utils import get_thumbnail_src 13from seahub.utils import is_pro_version, FILEEXT_TYPE_MAP, IMAGE, XMIND, VIDEO 14from seahub.utils.file_tags import get_files_tags_in_dir 15from seahub.utils.repo import is_group_repo_staff, is_repo_owner 16from seahub.utils.timeutils import timestamp_to_isoformat_timestr 17 18logger = logging.getLogger(__name__) 19json_content_type = 'application/json; charset=utf-8' 20HTTP_520_OPERATION_FAILED = 520 21 22 23def permission_check_admin_owner(request, username, repo_id): # maybe add more complex logic in the future 24 """ 25 if repo is owned by user return true 26 or check whether repo is owned by group and whether user is group's staff 27 so finally the code is: 28 check user == repo's owner 29 else 30 check user is the such group's staff 31 """ 32 33 if is_repo_owner(request, repo_id, username): 34 return True 35 else: 36 return is_group_repo_staff(request, repo_id, username) 37 38 39def get_dir_file_recursively(repo_id, path, all_dirs): 40 is_pro = is_pro_version() 41 path_id = seafile_api.get_dir_id_by_path(repo_id, path) 42 dirs = seafile_api.list_dir_by_path(repo_id, path, -1, -1) 43 44 for dirent in dirs: 45 entry = {} 46 if stat.S_ISDIR(dirent.mode): 47 entry["type"] = 'dir' 48 else: 49 entry["type"] = 'file' 50 entry['modifier_email'] = dirent.modifier 51 entry["size"] = dirent.size 52 53 if is_pro: 54 entry["is_locked"] = dirent.is_locked 55 entry["lock_owner"] = dirent.lock_owner 56 if dirent.lock_owner: 57 entry["lock_owner_name"] = email2nickname(dirent.lock_owner) 58 entry["lock_time"] = dirent.lock_time 59 60 entry["parent_dir"] = path 61 entry["id"] = dirent.obj_id 62 entry["name"] = dirent.obj_name 63 entry["mtime"] = timestamp_to_isoformat_timestr(dirent.mtime) 64 65 all_dirs.append(entry) 66 67 # Use dict to reduce memcache fetch cost in large for-loop. 68 file_list = [item for item in all_dirs if item['type'] == 'file'] 69 contact_email_dict = {} 70 nickname_dict = {} 71 modifiers_set = {x['modifier_email'] for x in file_list} 72 for e in modifiers_set: 73 if e not in contact_email_dict: 74 contact_email_dict[e] = email2contact_email(e) 75 if e not in nickname_dict: 76 nickname_dict[e] = email2nickname(e) 77 78 for e in file_list: 79 e['modifier_contact_email'] = contact_email_dict.get(e['modifier_email'], '') 80 e['modifier_name'] = nickname_dict.get(e['modifier_email'], '') 81 82 83 if stat.S_ISDIR(dirent.mode): 84 sub_path = posixpath.join(path, dirent.obj_name) 85 get_dir_file_recursively(repo_id, sub_path, all_dirs) 86 87 return all_dirs 88 89 90def get_dir_file_info_list(username, request_type, repo_obj, parent_dir, 91 with_thumbnail, thumbnail_size): 92 93 repo_id = repo_obj.id 94 dir_info_list = [] 95 file_info_list = [] 96 97 # get dirent(folder and file) list 98 parent_dir_id = seafile_api.get_dir_id_by_path(repo_id, parent_dir) 99 dir_file_list = seafile_api.list_dir_with_perm(repo_id, 100 parent_dir, parent_dir_id, username, -1, -1) 101 102 try: 103 starred_items = UserStarredFiles.objects.filter(email=username, 104 repo_id=repo_id, path__startswith=parent_dir, org_id=-1) 105 starred_item_path_list = [f.path.rstrip('/') for f in starred_items] 106 except Exception as e: 107 logger.error(e) 108 starred_item_path_list = [] 109 110 # only get dir info list 111 if not request_type or request_type == 'd': 112 dir_list = [dirent for dirent in dir_file_list if stat.S_ISDIR(dirent.mode)] 113 for dirent in dir_list: 114 dir_info = {} 115 dir_info["type"] = "dir" 116 dir_info["id"] = dirent.obj_id 117 dir_info["name"] = dirent.obj_name 118 dir_info["mtime"] = timestamp_to_isoformat_timestr(dirent.mtime) 119 dir_info["permission"] = dirent.permission 120 dir_info["parent_dir"] = parent_dir 121 dir_info_list.append(dir_info) 122 123 # get star info 124 dir_info['starred'] = False 125 dir_path = posixpath.join(parent_dir, dirent.obj_name) 126 if dir_path.rstrip('/') in starred_item_path_list: 127 dir_info['starred'] = True 128 129 # only get file info list 130 if not request_type or request_type == 'f': 131 132 file_list = [dirent for dirent in dir_file_list if not stat.S_ISDIR(dirent.mode)] 133 134 # Use dict to reduce memcache fetch cost in large for-loop. 135 nickname_dict = {} 136 contact_email_dict = {} 137 modifier_set = {x.modifier for x in file_list} 138 lock_owner_set = {x.lock_owner for x in file_list} 139 for e in modifier_set | lock_owner_set: 140 if e not in nickname_dict: 141 nickname_dict[e] = email2nickname(e) 142 if e not in contact_email_dict: 143 contact_email_dict[e] = email2contact_email(e) 144 145 try: 146 files_tags_in_dir = get_files_tags_in_dir(repo_id, parent_dir) 147 except Exception as e: 148 logger.error(e) 149 files_tags_in_dir = {} 150 151 for dirent in file_list: 152 153 file_name = dirent.obj_name 154 file_path = posixpath.join(parent_dir, file_name) 155 file_obj_id = dirent.obj_id 156 157 file_info = {} 158 file_info["type"] = "file" 159 file_info["id"] = file_obj_id 160 file_info["name"] = file_name 161 file_info["mtime"] = timestamp_to_isoformat_timestr(dirent.mtime) 162 file_info["permission"] = dirent.permission 163 file_info["parent_dir"] = parent_dir 164 file_info["size"] = dirent.size 165 166 modifier_email = dirent.modifier 167 file_info['modifier_email'] = modifier_email 168 file_info['modifier_name'] = nickname_dict.get(modifier_email, '') 169 file_info['modifier_contact_email'] = contact_email_dict.get(modifier_email, '') 170 171 # get lock info 172 if is_pro_version(): 173 file_info["is_locked"] = dirent.is_locked 174 file_info["lock_time"] = dirent.lock_time 175 176 lock_owner_email = dirent.lock_owner or '' 177 file_info["lock_owner"] = lock_owner_email 178 file_info['lock_owner_name'] = nickname_dict.get(lock_owner_email, '') 179 file_info['lock_owner_contact_email'] = contact_email_dict.get(lock_owner_email, '') 180 181 if username == lock_owner_email: 182 file_info["locked_by_me"] = True 183 else: 184 file_info["locked_by_me"] = False 185 186 # get star info 187 file_info['starred'] = False 188 if file_path.rstrip('/') in starred_item_path_list: 189 file_info['starred'] = True 190 191 # get tag info 192 file_tags = files_tags_in_dir.get(file_name, []) 193 if file_tags: 194 file_info['file_tags'] = [] 195 for file_tag in file_tags: 196 file_info['file_tags'].append(file_tag) 197 198 # get thumbnail info 199 if with_thumbnail and not repo_obj.encrypted: 200 201 # used for providing a way to determine 202 # if send a request to create thumbnail. 203 204 fileExt = os.path.splitext(file_name)[1][1:].lower() 205 file_type = FILEEXT_TYPE_MAP.get(fileExt) 206 207 if file_type in (IMAGE, XMIND) or \ 208 file_type == VIDEO and ENABLE_VIDEO_THUMBNAIL: 209 210 # if thumbnail has already been created, return its src. 211 # Then web browser will use this src to get thumbnail instead of 212 # recreating it. 213 thumbnail_file_path = os.path.join(THUMBNAIL_ROOT, 214 str(thumbnail_size), file_obj_id) 215 if os.path.exists(thumbnail_file_path): 216 src = get_thumbnail_src(repo_id, thumbnail_size, file_path) 217 file_info['encoded_thumbnail_src'] = urlquote(src) 218 219 file_info_list.append(file_info) 220 221 dir_info_list.sort(key=lambda x: x['name'].lower()) 222 file_info_list.sort(key=lambda x: x['name'].lower()) 223 224 return dir_info_list, file_info_list 225