From 03a2a0386a6a2e64f9578d9ff3b87b32d7732b4d Mon Sep 17 00:00:00 2001 From: Rick Elrod Date: Tue, 4 Jun 2024 18:12:50 +0200 Subject: [PATCH] Lock down OAuth2 views Doing this properly with RBAC depends on #424. For now, we limit Application views to superusers. We limit Token views using a custom DRF permission class based on what was specified in AWX's access.py Signed-off-by: Rick Elrod --- .../0004_alter_oauth2accesstoken_scope.py | 18 + .../oauth2_provider/models/access_token.py | 6 - .../oauth2_provider/serializers/token.py | 2 +- .../oauth2_provider/views/application.py | 7 +- ansible_base/oauth2_provider/views/token.py | 38 +- test_app/tests/oauth2_provider/test_rbac.py | 405 ++++++++++++++++++ 6 files changed, 466 insertions(+), 10 deletions(-) create mode 100644 ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py create mode 100644 test_app/tests/oauth2_provider/test_rbac.py diff --git a/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py b/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py new file mode 100644 index 000000000..795cff2f2 --- /dev/null +++ b/ansible_base/oauth2_provider/migrations/0004_alter_oauth2accesstoken_scope.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.11 on 2024-06-04 12:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dab_oauth2_provider', '0003_remove_oauth2application_logo_data'), + ] + + operations = [ + migrations.AlterField( + model_name='oauth2accesstoken', + name='scope', + field=models.CharField(blank=True, default='write', help_text="Allowed scopes, further restricts user's permissions. Must be a simple space-separated string with allowed scopes ['read', 'write'].", max_length=32), + ), + ] diff --git a/ansible_base/oauth2_provider/models/access_token.py b/ansible_base/oauth2_provider/models/access_token.py index 175cc2baa..89998ff39 100644 --- a/ansible_base/oauth2_provider/models/access_token.py +++ b/ansible_base/oauth2_provider/models/access_token.py @@ -26,11 +26,6 @@ class Meta(oauth2_models.AbstractAccessToken.Meta): ordering = ('id',) swappable = "OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL" - SCOPE_CHOICES = [ - ('read', _('Read')), - ('write', _('Write')), - ] - user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, @@ -60,7 +55,6 @@ class Meta(oauth2_models.AbstractAccessToken.Meta): blank=True, default='write', max_length=32, - choices=SCOPE_CHOICES, help_text=_("Allowed scopes, further restricts user's permissions. Must be a simple space-separated string with allowed scopes ['read', 'write']."), ) token = prevent_search( diff --git a/ansible_base/oauth2_provider/serializers/token.py b/ansible_base/oauth2_provider/serializers/token.py index 150437f07..6e6d3128d 100644 --- a/ansible_base/oauth2_provider/serializers/token.py +++ b/ansible_base/oauth2_provider/serializers/token.py @@ -22,7 +22,7 @@ class BaseOAuth2TokenSerializer(CommonModelSerializer): refresh_token = SerializerMethodField() token = SerializerMethodField() - ALLOWED_SCOPES = [x[0] for x in OAuth2AccessToken.SCOPE_CHOICES] + ALLOWED_SCOPES = ['read', 'write'] class Meta: model = OAuth2AccessToken diff --git a/ansible_base/oauth2_provider/views/application.py b/ansible_base/oauth2_provider/views/application.py index 25c5919ef..bdb3bcd3b 100644 --- a/ansible_base/oauth2_provider/views/application.py +++ b/ansible_base/oauth2_provider/views/application.py @@ -1,7 +1,7 @@ -from rest_framework import permissions from rest_framework.viewsets import ModelViewSet from ansible_base.lib.utils.views.django_app_api import AnsibleBaseDjangoAppApiView +from ansible_base.lib.utils.views.permissions import IsSuperuser from ansible_base.oauth2_provider.models import OAuth2Application from ansible_base.oauth2_provider.serializers import OAuth2ApplicationSerializer @@ -9,4 +9,7 @@ class OAuth2ApplicationViewSet(AnsibleBaseDjangoAppApiView, ModelViewSet): queryset = OAuth2Application.objects.all() serializer_class = OAuth2ApplicationSerializer - permission_classes = [permissions.IsAuthenticated] + permission_classes = [IsSuperuser] + + def filter_queryset(self, queryset): + return super().filter_queryset(OAuth2Application.access_qs(self.request.user, queryset=queryset)) diff --git a/ansible_base/oauth2_provider/views/token.py b/ansible_base/oauth2_provider/views/token.py index 021823797..ede407a48 100644 --- a/ansible_base/oauth2_provider/views/token.py +++ b/ansible_base/oauth2_provider/views/token.py @@ -51,7 +51,43 @@ def create_token_response(self, request): return request.build_absolute_uri(), {}, str(e), e.status_code +class OAuth2TokenPermission(permissions.BasePermission): + # An app token is a token that has an application attached to it + # A personal access token (PAT) is a token with no application attached to it + # With that in mind: + # - An app token can be read, changed, or deleted if: + # - I am the superuser + # - I am the admin of the organization of the application of the token + # - I am the user of the token + # - An app token can be created if: + # - I am the superuser + # - A PAT can be read, changed, or deleted if: + # - I am the superuser + # - I am the user of the token + # - A PAT can be created if: + # - I am a user + + def has_permission(self, request, view): + # Handle PAT and app token creation separately + if request.method == 'POST': + if request.data.get('application'): + return request.user.is_superuser + return request.user.is_authenticated + return request.user.is_authenticated + + def has_object_permission(self, request, view, obj): + if request.method in ['GET', 'PATCH', 'DELETE']: + if request.user.is_superuser: + return True + if obj.application: + if obj.application.organization.admins.filter(id=request.user.id).exists(): + return True + return obj.user == request.user + return request.user == obj.user + return False + + class OAuth2TokenViewSet(ModelViewSet, AnsibleBaseDjangoAppApiView): queryset = OAuth2AccessToken.objects.all() serializer_class = OAuth2TokenSerializer - permission_classes = [permissions.IsAuthenticated] + permission_classes = [OAuth2TokenPermission] diff --git a/test_app/tests/oauth2_provider/test_rbac.py b/test_app/tests/oauth2_provider/test_rbac.py new file mode 100644 index 000000000..63bb86091 --- /dev/null +++ b/test_app/tests/oauth2_provider/test_rbac.py @@ -0,0 +1,405 @@ +""" +This module tests RBAC against OAuth2 views and models. +The exact set of expected access comes directly from awx.main.access +and the doc strings of the relevant classes defined there are directly turned +into tests here. + +Some of these tests might be redundant with other tests defined elsewhere. +This is intentional as this module is meant to check every single condition +defined within the AWX spec (as well as additional cases where the access +conditions are not met). +""" + +import pytest +from django.urls import reverse + +from ansible_base.oauth2_provider.models import OAuth2AccessToken + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_user_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + pytest.param('user_in_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_read_change_delete( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2ApplicationAccess: + + I can read, change or delete OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the user of the application. + - I am a user in the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + second_org = request.getfixturevalue('organization_1') + if user_case == 'admin_of_app_user_org': + # - I am the admin of the organization of the user of the application. + second_org.users.add(app.user) # Add the app owner user to the org + second_org.users.add(user) # Add me to the org + second_org.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("application-detail", args=[app.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'name': 'new name'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user_in_org', False), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_create( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2ApplicationAccess: + + I can create OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_create_status = 201 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + if user_case == 'admin_of_app_org': + # - I am the admin of the organization of the application. + organization.users.add(user) # Add me to the org + organization.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_create_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("application-list") + data = { + 'name': 'new app', + 'organization': organization.id, + 'client_type': 'confidential', + 'authorization_grant_type': 'password', + } + if user is not None: + data['user'] = user.id + response = unauthenticated_api_client.post(url, data=data) + assert response.status_code == expected_create_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + ('admin_of_token_app_org', True), + ('user_of_token', True), + ('user_in_app_org', False), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_token_read_change_delete( + request, + user_case, + has_access, + org_member_rd, + org_admin_rd, + random_user, + oauth2_application, + oauth2_user_application_token, + organization, + unauthenticated_api_client, +): + """ + From awx.main.access.OAuth2TokenAccess: + + I can read, change or delete an app token when: + - I am a superuser. + - I am the admin of the organization of the application of the token. + - I am the user of the token. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + if user_case == 'admin_of_token_app_org': + # - I am the admin of the organization of the application of the token. + organization.users.add(user) # Add me to the org + organization.admins.add(user) # And make me an admin of the org + elif user_case == 'user_of_token': + # - I am the user of the token. + user = oauth2_user_application_token.user + elif user_case == 'user_in_app_org': + # Negative case, user is in the org but not an admin or the token user + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-detail", args=[oauth2_user_application_token.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'description': 'new description'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data) + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + pytest.param('admin_of_app_user_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + pytest.param('user_in_app_org', True, marks=pytest.mark.xfail(reason="https://github.com/ansible/django-ansible-base/issues/424")), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_application_token_create( + request, user_case, has_access, org_member_rd, org_admin_rd, random_user, oauth2_application, organization, unauthenticated_api_client +): + """ + From awx.main.access.OAuth2TokenAccess: + + I can create an OAuth2 app token when: + - I have the read permission of the related application. + + --- + + Inheriting from app access: + I can read OAuth 2 applications when: + - I am a superuser. + - I am the admin of the organization of the user of the application. + - I am a user in the organization of the application. + """ + app = oauth2_application[0] + app.organization = organization + app.user = random_user + app.save() + + expected_create_status = 201 if has_access else 403 + + if user_case == 'superuser': + # - I am a superuser. + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + else: + user = request.getfixturevalue('random_user_1') + second_org = request.getfixturevalue('organization_1') + if user_case == 'admin_of_app_user_org': + # - I am the admin of the organization of the user of the application. + second_org.users.add(app.user) # Add the app owner user to the org + second_org.users.add(user) # Add me to the org + second_org.admins.add(user) # And make me an admin of the org + elif user_case == 'user_in_app_org': + # - I am a user in the organization of the application. + organization.users.add(user) # Add me to the org + elif user_case == 'user': + # Negative case, do nothing, the user is just some random user + pass + elif user_case == 'anon': + user = None + expected_create_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-list") + + # Create + data = { + 'application': app.id, + 'description': 'new token', + 'scope': 'read write', + } + response = unauthenticated_api_client.post(url, data=data) + assert response.status_code == expected_create_status, (response.status_code, response.data) + + +@pytest.mark.django_db +def test_oauth2_pat_create(request, org_member_rd, org_admin_rd, user, random_user, user_api_client): + """ + From awx.main.access.OAuth2TokenAccess: + + I can create an OAuth2 Personal Access Token when: + - I am a user. But I can only create a PAT for myself. + """ + + url = reverse("token-list") + + # Create PAT + data = { + 'description': 'new PAT', + 'scope': 'read write', + } + response = user_api_client.post(url, data=data) + assert response.status_code == 201 + token_id = response.data['id'] + token = OAuth2AccessToken.objects.get(id=token_id) + assert token.user == user + + # Create PAT with another user + data = { + 'description': 'new PAT that is not mine', + 'scope': 'read write', + 'user': random_user.id, + } + response = user_api_client.post(url, data=data) + # We don't block the request but we force the user to be the requester + assert response.status_code == 201 + token_id = response.data['id'] + token = OAuth2AccessToken.objects.get(id=token_id) + assert token.user == user # not random_user + + +@pytest.mark.parametrize( + 'user_case, has_access', + [ + ('superuser', True), + ('user_of_token', True), + ('user', False), + ('anon', False), + ], +) +@pytest.mark.django_db +def test_oauth2_pat_read_change_delete(request, user_case, has_access, org_member_rd, org_admin_rd, unauthenticated_api_client, oauth2_user_pat): + """ + From awx.main.access.OAuth2TokenAccess: + + I can read, change or delete a personal token when: + - I am the user of the token + - I am the superuser + """ + expected_read_status = 200 if has_access else 403 + expected_change_status = 200 if has_access else 403 + expected_delete_status = 204 if has_access else 403 + + # Determine the user based on the test case (adding them to organizations, etc. as necessary). + if user_case == 'superuser': + # - I am the superuser + user = request.getfixturevalue('admin_user') # Nothing to do, just use the admin user + elif user_case == 'user_of_token': + # - I am the user of the token + user = oauth2_user_pat.user + elif user_case == 'user': + # Negative case, the user is just some random user + user = request.getfixturevalue('random_user') + elif user_case == 'anon': + user = None + expected_read_status = 401 + expected_change_status = 401 + expected_delete_status = 401 + else: + raise ValueError(f"Invalid user_case: {user_case}") + + if user is not None: + unauthenticated_api_client.force_login(user) + url = reverse("token-detail", args=[oauth2_user_pat.id]) + + # Read + response = unauthenticated_api_client.get(url) + assert response.status_code == expected_read_status, (response.status_code, response.data) + + # Change + response = unauthenticated_api_client.patch(url, data={'description': 'new description'}) + assert response.status_code == expected_change_status, (response.status_code, response.data) + + # Delete + response = unauthenticated_api_client.delete(url) + assert response.status_code == expected_delete_status, (response.status_code, response.data)