Skip to content

Commit

Permalink
update code style for wsgidav/dc/domain_controller.py
Browse files Browse the repository at this point in the history
  • Loading branch information
imwhatiam committed Mar 20, 2023
1 parent ed3dfd0 commit 41f6979
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions wsgidav/dc/domain_controller.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import os
import posixpath
import hashlib
import base64
import seahub_settings
from seaserv import ccnet_api as api
from pysearpc import SearpcError
from wsgidav.dc.seaf_utils import CCNET_CONF_DIR, SEAFILE_CENTRAL_CONF_DIR, multi_tenancy_enabled
from wsgidav.dc.seaf_utils import multi_tenancy_enabled
from wsgidav.dc import seahub_db
import wsgidav.util as util
from wsgidav.dc.base_dc import BaseDomainController
Expand All @@ -15,14 +13,15 @@
# the block size for the cipher object; must be 16, 24, or 32 for AES
BLOCK_SIZE = 32

import base64
PADDING = b'{'

# An encrypted block size must be a multiple of 16
pad = lambda s: s + (16 - len(s) % 16) * PADDING

# encrypt with AES, encode with base64
EncodeAES = lambda c, s: base64.b64encode(c.encrypt(pad(s)))


class SeafileDomainController(BaseDomainController):

def __init__(self, wsgidav_app, config):
Expand All @@ -38,7 +37,7 @@ def supports_http_digest_auth(self):
def get_domain_realm(self, inputURL, environ):
return "Seafile Authentication"

def require_authentication(self, realmname, envrion):
def require_authentication(self, realmname, environ):
return True

def isRealmUser(self, realmname, username, environ):
Expand Down Expand Up @@ -66,23 +65,24 @@ def basic_auth_user(self, realmname, username, password, environ):
else:
if session:
profile_profile = seahub_db.Base.classes.profile_profile
q = session.query(profile_profile.user).filter(profile_profile.contact_email==username)
q = session.query(profile_profile.user) \
.filter(profile_profile.contact_email == username)
res = q.first()
if res:
ccnet_email = res[0]

if not ccnet_email:
_logger.warning('User %s doesn\'t exist', username)
return False

enable_webdav_secret = False
if hasattr(seahub_settings, 'ENABLE_WEBDAV_SECRET'):
enable_webdav_secret = seahub_settings.ENABLE_WEBDAV_SECRET

enable_two_factor_auth = False
if session and enableTwoFactorAuth(session, ccnet_email):
enable_two_factor_auth = True

if not enable_webdav_secret and enable_two_factor_auth:
_logger.warning("Two factor auth is enabled, no access to webdav.")
return False
Expand All @@ -94,7 +94,7 @@ def basic_auth_user(self, realmname, username, password, environ):
return False
else:
if not validateSecret(session, password, ccnet_email) and \
api.validate_emailuser(ccnet_email, password) != 0:
api.validate_emailuser(ccnet_email, password) != 0:
return False

username = ccnet_email
Expand All @@ -111,33 +111,36 @@ def basic_auth_user(self, realmname, username, password, environ):
environ['seafile.is_guest'] = True
else:
environ['seafile.is_guest'] = False
except Exception as e:
except Exception:
_logger.exception('get_emailuser')

if multi_tenancy_enabled():
try:
orgs = api.get_orgs_by_user(username)
if orgs:
environ['seafile.org_id'] = orgs[0].org_id
except Exception as e:
except Exception:
_logger.exception('get_orgs_by_user')
pass

environ["http_authenticator.username"] = username

return True


def validateSecret(session, password, ccnet_email):

if not session:
return False

from Crypto.Cipher import AES
secret = seahub_settings.SECRET_KEY[:BLOCK_SIZE]
cipher = AES.new(secret.encode('utf8'), AES.MODE_ECB)
encoded_str = 'aes$' + EncodeAES(cipher, password.encode('utf8')).decode('utf8')
options_useroptions = seahub_db.Base.classes.options_useroptions
q = session.query(options_useroptions.option_val)
q = q.filter(options_useroptions.email==ccnet_email,
options_useroptions.option_key=='webdav_secret')
q = q.filter(options_useroptions.email == ccnet_email,
options_useroptions.option_key == 'webdav_secret')
res = q.first()
if not res:
return False
Expand All @@ -159,7 +162,9 @@ def validateSecret(session, password, ccnet_email):

return hashed_password == hash_password(password, salt, algorithm)


def hash_password(password, salt, algorithm='sha1'):

digest = hashlib.pbkdf2_hmac(algorithm,
password.encode(),
salt.encode(),
Expand All @@ -169,29 +174,35 @@ def hash_password(password, salt, algorithm='sha1'):
# sha1$QRle$5511a4e2efb7d12e1f64647f64c0c6e105d150ff
return "{}${}${}".format(algorithm, salt, hex_hash)


def enableTwoFactorAuth(session, email):

enable_settings_via_web = True
if hasattr(seahub_settings, 'ENABLE_SETTINGS_VIA_WEB'):
enable_settings_via_web = seahub_settings.ENABLE_SETTINGS_VIA_WEB

global_two_factor_auth = False
if enable_settings_via_web:
constance_config = seahub_db.Base.classes.constance_config
q = session.query(constance_config.value).filter(constance_config.constance_key=='ENABLE_TWO_FACTOR_AUTH')
q = session.query(constance_config.value) \
.filter(constance_config.constance_key == 'ENABLE_TWO_FACTOR_AUTH')
res = q.first()
if res:
if res[0] == 'gAJLAS4=':
global_two_factor_auth = True
else:
return False

elif hasattr(seahub_settings, 'ENABLE_TWO_FACTOR_AUTH'):
global_two_factor_auth = seahub_settings.ENABLE_TWO_FACTOR_AUTH

if global_two_factor_auth:

two_factor_staticdevice = seahub_db.Base.classes.two_factor_staticdevice
two_factor_totpdevice = seahub_db.Base.classes.two_factor_totpdevice
if session.query(exists().where(two_factor_staticdevice.user==email)).scalar() \
or session.query(exists().where(two_factor_totpdevice.user==email)).scalar():

if session.query(exists().where(two_factor_staticdevice.user == email)).scalar() \
or session.query(exists().where(two_factor_totpdevice.user == email)).scalar():
return True

return False

0 comments on commit 41f6979

Please sign in to comment.