1import os 2import posixpath 3import seahub_settings 4from seaserv import ccnet_api as api 5from pysearpc import SearpcError 6from wsgidav.dc.seaf_utils import CCNET_CONF_DIR, SEAFILE_CENTRAL_CONF_DIR, multi_tenancy_enabled 7from wsgidav.dc import seahub_db 8import wsgidav.util as util 9from wsgidav.dc.base_dc import BaseDomainController 10from sqlalchemy.sql import exists 11# basic_auth_user, get_domain_realm, require_authentication 12_logger = util.get_module_logger(__name__) 13 14# the block size for the cipher object; must be 16, 24, or 32 for AES 15BLOCK_SIZE = 32 16 17import base64 18PADDING = b'{' 19 20# An encrypted block size must be a multiple of 16 21pad = lambda s: s + (16 - len(s) % 16) * PADDING 22# encrypt with AES, encode with base64 23EncodeAES = lambda c, s: base64.b64encode(c.encrypt(pad(s))) 24 25class SeafileDomainController(BaseDomainController): 26 27 def __init__(self, wsgidav_app, config): 28 self.session_cls = seahub_db.init_db_session_class() 29 30 def __repr__(self): 31 return self.__class__.__name__ 32 33 def supports_http_digest_auth(self): 34 # We have access to a plaintext password (or stored hash) 35 return True 36 37 def get_domain_realm(self, inputURL, environ): 38 return "Seafile Authentication" 39 40 def require_authentication(self, realmname, envrion): 41 return True 42 43 def isRealmUser(self, realmname, username, environ): 44 return True 45 46 def getRealmUserPassword(self, realmname, username, environ): 47 """ 48 Not applicable to seafile. 49 """ 50 return "" 51 52 def basic_auth_user(self, realmname, username, password, environ): 53 if "'" in username: 54 return False 55 56 try: 57 ccnet_email = None 58 session = None 59 if self.session_cls: 60 session = self.session_cls() 61 62 user = api.get_emailuser(username) 63 if user: 64 ccnet_email = user.email 65 else: 66 if session: 67 profile_profile = seahub_db.Base.classes.profile_profile 68 q = session.query(profile_profile.user).filter(profile_profile.contact_email==username) 69 res = q.first() 70 if res: 71 ccnet_email = res[0] 72 73 if not ccnet_email: 74 _logger.warning('User %s doesn\'t exist', username) 75 return False 76 77 enable_webdav_secret = False 78 if hasattr(seahub_settings, 'ENABLE_WEBDAV_SECRET'): 79 enable_webdav_secret = seahub_settings.ENABLE_WEBDAV_SECRET 80 81 enable_two_factor_auth = False 82 if session and enableTwoFactorAuth(session, ccnet_email): 83 enable_two_factor_auth = True 84 85 if not enable_webdav_secret and enable_two_factor_auth: 86 _logger.warning("Two factor auth is enabled, no access to webdav.") 87 return False 88 elif enable_webdav_secret and enable_two_factor_auth: 89 if not validateSecret(session, password, ccnet_email): 90 return False 91 elif not enable_webdav_secret and not enable_two_factor_auth: 92 if api.validate_emailuser(ccnet_email, password) != 0: 93 return False 94 else: 95 if not validateSecret(session, password, ccnet_email) and \ 96 api.validate_emailuser(ccnet_email, password) != 0: 97 return False 98 99 username = ccnet_email 100 except Exception as e: 101 _logger.warning('Failed to login: %s', e) 102 return False 103 finally: 104 if session: 105 session.close() 106 107 try: 108 user = api.get_emailuser_with_import(username) 109 if user.role == 'guest': 110 environ['seafile.is_guest'] = True 111 else: 112 environ['seafile.is_guest'] = False 113 except Exception as e: 114 _logger.exception('get_emailuser') 115 116 if multi_tenancy_enabled(): 117 try: 118 orgs = api.get_orgs_by_user(username) 119 if orgs: 120 environ['seafile.org_id'] = orgs[0].org_id 121 except Exception as e: 122 _logger.exception('get_orgs_by_user') 123 pass 124 125 environ["http_authenticator.username"] = username 126 127 return True 128 129def validateSecret(session, password, ccnet_email): 130 if not session: 131 return False 132 from Crypto.Cipher import AES 133 secret = seahub_settings.SECRET_KEY[:BLOCK_SIZE] 134 cipher = AES.new(secret.encode('utf8'), AES.MODE_ECB) 135 encoded_str = 'aes$' + EncodeAES(cipher, password.encode('utf8')).decode('utf8') 136 options_useroptions = seahub_db.Base.classes.options_useroptions 137 q = session.query(options_useroptions.email) 138 q = q.filter(options_useroptions.email==ccnet_email, 139 options_useroptions.option_val==encoded_str) 140 res = q.first() 141 if not res: 142 return False 143 return True 144 145def enableTwoFactorAuth(session, email): 146 enable_settings_via_web = True 147 if hasattr(seahub_settings, 'ENABLE_SETTINGS_VIA_WEB'): 148 enable_settings_via_web = seahub_settings.ENABLE_SETTINGS_VIA_WEB 149 150 global_two_factor_auth = False 151 if enable_settings_via_web: 152 constance_config = seahub_db.Base.classes.constance_config 153 q = session.query(constance_config.value).filter(constance_config.constance_key=='ENABLE_TWO_FACTOR_AUTH') 154 res = q.first() 155 if res: 156 if res[0] == 'gAJLAS4=': 157 global_two_factor_auth = True 158 else: 159 return False 160 elif hasattr(seahub_settings, 'ENABLE_TWO_FACTOR_AUTH'): 161 global_two_factor_auth = seahub_settings.ENABLE_TWO_FACTOR_AUTH 162 163 if global_two_factor_auth: 164 two_factor_staticdevice = seahub_db.Base.classes.two_factor_staticdevice 165 two_factor_totpdevice = seahub_db.Base.classes.two_factor_totpdevice 166 if session.query(exists().where(two_factor_staticdevice.user==email)).scalar() \ 167 or session.query(exists().where(two_factor_totpdevice.user==email)).scalar(): 168 return True 169 170 return False 171