-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expire user permissions #72
Changes from 8 commits
837b5f4
28815f5
011ebda
df017ea
3622bbf
b437aba
764d677
2d002ca
e678353
716651c
c8111d4
7ef5429
a195929
b4eb1b3
a5f2c58
e92b419
e98e66b
da46257
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,30 @@ | ||
from datetime import timedelta | ||
|
||
import superset | ||
from flask import flash, g, redirect, request, session, url_for | ||
from superset.config import USER_DOMAIN_ROLE_EXPIRY | ||
|
||
from .utils import SESSION_USER_DOMAINS_KEY | ||
from hq_superset.const import ( | ||
SESSION_DOMAIN_ROLE_LAST_SYNCED_AT, | ||
SESSION_USER_DOMAINS_KEY, | ||
) | ||
from hq_superset.utils import DomainSyncUtil, datetime_utcnow_naive | ||
|
||
|
||
def before_request_hook(): | ||
return ensure_domain_selected() | ||
""" | ||
Call all hooks functions set in sequence and | ||
if any hook returns a response, | ||
break the chain and return that response | ||
""" | ||
hooks = [ | ||
ensure_domain_selected, | ||
sync_user_domain_role | ||
] | ||
for _function in hooks: | ||
response = _function() | ||
if response: | ||
return response | ||
|
||
|
||
def after_request_hook(response): | ||
|
@@ -55,6 +75,28 @@ def ensure_domain_selected(): | |
return redirect(url_for('SelectDomainView.list', next=request.url)) | ||
|
||
|
||
def sync_user_domain_role(): | ||
if is_user_admin() or ( | ||
request.url_rule | ||
and request.url_rule.endpoint in DOMAIN_EXCLUDED_VIEWS | ||
): | ||
return | ||
if _domain_role_expired(): | ||
_sync_domain_role() | ||
|
||
|
||
def _domain_role_expired(): | ||
if not session.get(SESSION_DOMAIN_ROLE_LAST_SYNCED_AT): | ||
return True | ||
|
||
time_since_last_sync = datetime_utcnow_naive() - session[SESSION_DOMAIN_ROLE_LAST_SYNCED_AT] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mkangia How about:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool. I can add that. This value is only always set by CCA and can't edited by the user so it would certainly be a datetime but no harm in adding safety. |
||
return time_since_last_sync >= timedelta(minutes=USER_DOMAIN_ROLE_EXPIRY) | ||
|
||
|
||
def _sync_domain_role(): | ||
DomainSyncUtil(superset.appbuilder.sm).sync_domain_role(g.hq_domain) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mkangia There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya, Great question. I guess I will follow the same pattern you have followed otherwise in case sync domain role returns false and redirect user to the select domain page, though possibly with a different error message like "Your permissions have expired and the sync failed due to an unexpected error. Please select project space again or login to continue" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This turned out to be a little challenging, but I got it working like this |
||
|
||
|
||
def is_valid_user_domain(hq_domain): | ||
# Admins have access to all domains | ||
return is_user_admin() or hq_domain in user_domains() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,15 +2,17 @@ | |
""" | ||
Base TestCase class | ||
""" | ||
|
||
import os | ||
import shutil | ||
|
||
import jwt | ||
from flask_testing import TestCase | ||
from sqlalchemy.sql import text | ||
from superset.app import create_app | ||
|
||
from hq_superset.utils import DOMAIN_PREFIX, get_hq_database | ||
from hq_superset.const import DOMAIN_PREFIX | ||
from hq_superset.tests.utils import OAuthMock | ||
from hq_superset.utils import get_hq_database | ||
|
||
superset_test_home = os.path.join(os.path.dirname(__file__), ".test_superset") | ||
shutil.rmtree(superset_test_home, ignore_errors=True) | ||
|
@@ -48,3 +50,33 @@ def tearDown(self): | |
sql = "; ".join(domain_schemas) + ";" | ||
connection.execute(text(sql)) | ||
super(HQDBTestCase, self).tearDown() | ||
|
||
|
||
class LoginUserTestMixin(object): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice. Seems useful. |
||
""" | ||
Use this mixin by calling login function with client | ||
& then logout once done for clearing the session | ||
""" | ||
def login(self, client): | ||
self._setup_user() | ||
# bypass oauth-workflow by skipping login and oauth flow | ||
with client.session_transaction() as session_: | ||
session_["oauth_state"] = "mock_state" | ||
state = jwt.encode({}, "mock_state", algorithm="HS256") | ||
return client.get(f"/oauth-authorized/commcare?state={state}", follow_redirects=True) | ||
|
||
def _setup_user(self): | ||
self.app.appbuilder.add_permissions(update_perms=True) | ||
self.app.appbuilder.sm.sync_role_definitions() | ||
|
||
self.oauth_mock = OAuthMock() | ||
self.app.appbuilder.sm.oauth_remotes = {"commcare": self.oauth_mock} | ||
|
||
gamma_role = self.app.appbuilder.sm.find_role('Gamma') | ||
self.user = self.app.appbuilder.sm.find_user(self.oauth_mock.user_json['username']) | ||
if not self.user: | ||
self.user = self.app.appbuilder.sm.add_user(**self.oauth_mock.user_json, role=[gamma_role]) | ||
|
||
@staticmethod | ||
def logout(client): | ||
return client.get("/logout/") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍