diff --git a/jasmin_services/api/serializers.py b/jasmin_services/api/serializers.py index 8847a37..e55dc78 100644 --- a/jasmin_services/api/serializers.py +++ b/jasmin_services/api/serializers.py @@ -119,3 +119,23 @@ class Meta: "ceda_managed", ] extra_kwargs = {"url": {"view_name": "category-services-detail", "lookup_field": "name"}} + + +class UserGrantSerializer(rf_serial.ModelSerializer): + """Simple details about a service.""" + + service = ServiceListSerializer(source="access.role.service", read_only=True) + role = RoleListSerializer(source="access.role", read_only=True) + + class Meta: + model = models.Grant + fields = [ + "id", + "service", + "role", + "granted_at", + "expires", + "revoked", + "revoked_at", + "user_reason", + ] diff --git a/jasmin_services/api/urls.py b/jasmin_services/api/urls.py index a62d339..93fb7c2 100644 --- a/jasmin_services/api/urls.py +++ b/jasmin_services/api/urls.py @@ -2,7 +2,6 @@ import types -import django.urls import rest_framework.routers as rf_routers import rest_framework_nested.routers @@ -40,6 +39,12 @@ apiviews.UserServicesViewSet, basename="users-services", ) +# Register route to get user's grants. +users_router.register( + "grants", + apiviews.UserGrantsViewSet, + basename="users-grants", +) # Create a route for accesing service by id. primary_router.register( diff --git a/jasmin_services/api/views.py b/jasmin_services/api/views.py index 2352168..df8972a 100644 --- a/jasmin_services/api/views.py +++ b/jasmin_services/api/views.py @@ -2,7 +2,6 @@ import datetime as dt -import django.contrib.auth import django.db.models as dj_models import django.utils.timezone import drf_spectacular.utils @@ -173,6 +172,64 @@ def get_queryset(self): ).distinct() +@drf_spectacular.utils.extend_schema_view( + list=drf_spectacular.utils.extend_schema( + parameters=[ + drf_spectacular.utils.OpenApiParameter( + name="service", + required=False, + type=str, + description="Name of the service you would like to filter the grants for.", + ), + drf_spectacular.utils.OpenApiParameter( + name="category", + required=False, + type=str, + description="Name of the category you would like to filter the grants for.", + ), + drf_spectacular.utils.OpenApiParameter( + name="role", + required=False, + type=str, + description="Name of the role you would like to filter the grants for.", + ), + ] + ), +) +class UserGrantsViewSet(rf_mixins.ListModelMixin, rf_viewsets.GenericViewSet): + """Get the grants associated with a user.""" + + required_scopes = ["jasmin.services.userservices.all"] + serializer_class = serializers.UserGrantSerializer + + def get_queryset(self): + queryset = models.Grant.objects.filter( + access__user__username=self.kwargs["user_username"], + revoked=False, + expires__gte=dt.datetime.now(), + ).prefetch_related("access__role__service") + + filter_params = {} + + # Option to filter by service query param + service = self.request.query_params.get("service") + if service is not None: + filter_params["access__role__service__name"] = service + + # Option to filter by category query param + category = self.request.query_params.get("category") + if category is not None: + filter_params["access__role__service__category__name"] = category + + # Option to filter by grant role name + role = self.request.query_params.get("role") + if role is not None: + filter_params["access__role__name"] = role + + queryset = queryset.filter(**filter_params) + return queryset + + class CategoriesViewSet( jasmin_django_utils.api.viewsets.ActionSerializerMixin, rf_viewsets.ReadOnlyModelViewSet, diff --git a/jasmin_services/notifications.py b/jasmin_services/notifications.py index 154eb5c..a99000d 100644 --- a/jasmin_services/notifications.py +++ b/jasmin_services/notifications.py @@ -157,17 +157,20 @@ def request_rejected(sender, instance, created, **kwargs): def grant_created(sender, instance, created, **kwargs): """Notifies the user when a grant is created.""" if created and instance.active and not re.match(r"train\d{3}", instance.access.user.username): - instance.access.user.notify( - "grant_created", - instance, - reverse( - "jasmin_services:service_details", - kwargs={ - "category": instance.access.role.service.category.name, - "service": instance.access.role.service.name, - }, - ), - ) + try: + instance.access.user.notify( + "grant_created", + instance, + reverse( + "jasmin_services:service_details", + kwargs={ + "category": instance.access.role.service.category.name, + "service": instance.access.role.service.name, + }, + ), + ) + except AttributeError: + pass @receiver(signals.post_save, sender=Grant) diff --git a/jasmin_services/tests/test_api.py b/jasmin_services/tests/test_api.py index 14c3c32..6759cb8 100644 --- a/jasmin_services/tests/test_api.py +++ b/jasmin_services/tests/test_api.py @@ -3,12 +3,23 @@ import datetime as dt from zoneinfo import ZoneInfo +import django.conf +import django.contrib.auth import oauth2_provider.models import rest_framework.test as rf_test +import jasmin_metadata.models + from .. import models from ..api import scopes +DJANGO_TZ = ZoneInfo(django.conf.settings.TIME_ZONE) + +# Python <3.12 does not support the %:z strftime format. +# So we must construct the correct offset string ourselves. +utc_offset_amount = int(dt.datetime.now(tz=DJANGO_TZ).utcoffset().total_seconds() / 3600) +utc_offset = f"+{utc_offset_amount:02}:00" + class BaseTest(rf_test.APITestCase): """Base test with two services category, access token, etc.""" @@ -30,23 +41,51 @@ def setUp(self): token="access_token", expires=dt.datetime.now(tz=ZoneInfo("Etc/GMT")) + dt.timedelta(days=1), ) - + # Make a user. + UserModel = django.contrib.auth.get_user_model() + self.user = UserModel.objects.create( + username="testuser", + email="testuser@example.com", + ) + metaform = jasmin_metadata.models.Form.objects.create(name="test") # Populate some data. - self.category = models.Category.objects.create( - name="test_cat", long_name="Meow", position=1 + self.category1 = models.Category.objects.create( + name="test_cat1", long_name="Meow", position=1 + ) + self.category2 = models.Category.objects.create( + name="test_cat2", long_name="Woof", position=1 ) self.service1 = models.Service.objects.create( - category=self.category, + category=self.category1, name="testservice1", summary="First test category", description="This should be a long description.", ) self.service2 = models.Service.objects.create( - category=self.category, + category=self.category2, name="testservice2", summary="Another test category", description="This should be a long description.", ) + # Grant the user roles within the services. + manager_role = models.Role.objects.create( + service=self.service1, name="MANAGER", description="Manager.", metadata_form=metaform + ) + manager_access = models.Access.objects.create(user=self.user, role=manager_role) + self.manager_grant = models.Grant.objects.create( + access=manager_access, + granted_by=self.user.username, + expires=dt.date.today() + dt.timedelta(days=365), + ) + deputy_role = models.Role.objects.create( + service=self.service2, name="DEPUTY", description="Deputy.", metadata_form=metaform + ) + deputy_access = models.Access.objects.create(user=self.user, role=deputy_role) + self.deputy_grant = models.Grant.objects.create( + access=deputy_access, + granted_by=self.user.username, + expires=dt.date.today() + dt.timedelta(days=365), + ) super().setUp() def tearDown(self): @@ -69,9 +108,9 @@ def test_services_list(self): "id": self.service1.id, "url": f"http://testserver/api/v1/categories/{self.service1.category.name}/services/{self.service1.name}/", "category": { - "id": self.category.id, - "url": "http://testserver/api/v1/categories/test_cat/", - "name": "test_cat", + "id": self.category1.id, + "url": "http://testserver/api/v1/categories/test_cat1/", + "name": "test_cat1", }, "name": "testservice1", "summary": "First test category", @@ -81,9 +120,9 @@ def test_services_list(self): "id": self.service2.id, "url": f"http://testserver/api/v1/categories/{self.service2.category.name}/services/{self.service2.name}/", "category": { - "id": self.category.id, - "url": "http://testserver/api/v1/categories/test_cat/", - "name": "test_cat", + "id": self.category2.id, + "url": "http://testserver/api/v1/categories/test_cat2/", + "name": "test_cat2", }, "name": "testservice2", "summary": "Another test category", @@ -105,11 +144,11 @@ def test_services_detail(self): "url": f"http://testserver/api/v1/categories/{self.service1.category.name}/services/{self.service1.name}/", "name": "testservice1", "category": { - "id": self.category.id, - "url": "http://testserver/api/v1/categories/test_cat/", - "name": "test_cat", + "id": self.category1.id, + "url": "http://testserver/api/v1/categories/test_cat1/", + "name": "test_cat1", }, - "roles": [], + "roles": [{"id": 1, "name": "MANAGER"}], "summary": "First test category", "description": "This should be a long description.", "approver_message": "", @@ -119,3 +158,172 @@ def test_services_detail(self): "ceda_managed": False, }, ) + + +class UserGrantsTest(BaseTest): + def test_user(self): + """Test the user grants API endpoint.""" + response = self.client.get( + f"/api/v1/users/{self.user.username}/grants/", + HTTP_AUTHORIZATION=f"Bearer {self.token.token}", + ) + self.assertEqual(response.status_code, 200) + self.assertListEqual( + response.json(), + [ + { + "id": self.manager_grant.id, + "service": { + "id": self.service1.id, + "url": "http://testserver/api/v1/categories/test_cat1/services/testservice1/", + "category": { + "id": self.category1.id, + "url": "http://testserver/api/v1/categories/test_cat1/", + "name": "test_cat1", + }, + "name": "testservice1", + "summary": "First test category", + "hidden": True, + }, + "role": {"id": 1, "name": "MANAGER"}, + "granted_at": self.manager_grant.granted_at.astimezone(DJANGO_TZ).strftime( + f"%Y-%m-%dT%H:%M:%S.%f{utc_offset}" + ), + "expires": self.manager_grant.expires.strftime("%Y-%m-%d"), + "revoked": False, + "revoked_at": None, + "user_reason": "", + }, + { + "id": self.deputy_grant.id, + "service": { + "id": self.service2.id, + "url": "http://testserver/api/v1/categories/test_cat2/services/testservice2/", + "category": { + "id": self.category2.id, + "url": "http://testserver/api/v1/categories/test_cat2/", + "name": "test_cat2", + }, + "name": "testservice2", + "summary": "Another test category", + "hidden": True, + }, + "role": {"id": 2, "name": "DEPUTY"}, + "granted_at": self.deputy_grant.granted_at.astimezone(DJANGO_TZ).strftime( + f"%Y-%m-%dT%H:%M:%S.%f{utc_offset}" + ), + "expires": self.deputy_grant.expires.strftime("%Y-%m-%d"), + "revoked": False, + "revoked_at": None, + "user_reason": "", + }, + ], + ) + + def test_grants_filter_category(self): + """Test that filtering with category query param works.""" + response = self.client.get( + f"/api/v1/users/{self.user.username}/grants/?category={self.category1.name}", + HTTP_AUTHORIZATION=f"Bearer {self.token.token}", + ) + self.assertEqual(response.status_code, 200) + self.assertListEqual( + response.json(), + [ + { + "id": self.manager_grant.id, + "service": { + "id": self.service1.id, + "url": "http://testserver/api/v1/categories/test_cat1/services/testservice1/", + "category": { + "id": self.category1.id, + "url": "http://testserver/api/v1/categories/test_cat1/", + "name": "test_cat1", + }, + "name": "testservice1", + "summary": "First test category", + "hidden": True, + }, + "role": {"id": 1, "name": "MANAGER"}, + "granted_at": self.manager_grant.granted_at.astimezone(DJANGO_TZ).strftime( + f"%Y-%m-%dT%H:%M:%S.%f{utc_offset}" + ), + "expires": self.manager_grant.expires.strftime("%Y-%m-%d"), + "revoked": False, + "revoked_at": None, + "user_reason": "", + } + ], + ) + + def test_grants_filter_service(self): + """Test that filtering with service query param works.""" + response = self.client.get( + f"/api/v1/users/{self.user.username}/grants/?service={self.service2.name}", + HTTP_AUTHORIZATION=f"Bearer {self.token.token}", + ) + self.assertEqual(response.status_code, 200) + self.assertListEqual( + response.json(), + [ + { + "id": self.deputy_grant.id, + "service": { + "id": self.service2.id, + "url": "http://testserver/api/v1/categories/test_cat2/services/testservice2/", + "category": { + "id": self.category2.id, + "url": "http://testserver/api/v1/categories/test_cat2/", + "name": "test_cat2", + }, + "name": "testservice2", + "summary": "Another test category", + "hidden": True, + }, + "role": {"id": 2, "name": "DEPUTY"}, + "granted_at": self.deputy_grant.granted_at.astimezone(DJANGO_TZ).strftime( + f"%Y-%m-%dT%H:%M:%S.%f{utc_offset}" + ), + "expires": self.deputy_grant.expires.strftime("%Y-%m-%d"), + "revoked": False, + "revoked_at": None, + "user_reason": "", + } + ], + ) + + def test_grants_filter_role(self): + """Test that filtering with role query param works.""" + response = self.client.get( + f"/api/v1/users/{self.user.username}/grants/?role=MANAGER", + HTTP_AUTHORIZATION=f"Bearer {self.token.token}", + ) + self.assertEqual(response.status_code, 200) + self.assertListEqual( + response.json(), + [ + { + "id": self.manager_grant.id, + "service": { + "id": self.service1.id, + "url": "http://testserver/api/v1/categories/test_cat1/services/testservice1/", + "category": { + "id": self.category1.id, + "url": "http://testserver/api/v1/categories/test_cat1/", + "name": "test_cat1", + }, + "name": "testservice1", + "summary": "First test category", + "hidden": True, + }, + "role": {"id": 1, "name": "MANAGER"}, + "granted_at": self.manager_grant.granted_at.astimezone(DJANGO_TZ).strftime( + f"%Y-%m-%dT%H:%M:%S.%f{utc_offset}" + ), + "expires": self.manager_grant.expires.strftime("%Y-%m-%d"), + "revoked": False, + "revoked_at": None, + "user_reason": "", + } + ], + ) diff --git a/jasmin_services/views/service_details.py b/jasmin_services/views/service_details.py index bdca4d0..5bf88e0 100644 --- a/jasmin_services/views/service_details.py +++ b/jasmin_services/views/service_details.py @@ -2,7 +2,6 @@ import string from datetime import date -import django.contrib.auth.mixins from django.db.models import Q from .. import models diff --git a/pyproject.toml b/pyproject.toml index f65a51d..c158fcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,10 +53,12 @@ types-python-dateutil = "^2.8.19.12" django-stubs = {extras = ["compatible-mypy"], version = "^5.0.0"} pylint-django = "^2.5.5" bandit = "^1.7.8" +tblib = "^3.0.0" [tool.poetry.group.test.dependencies] django-oauth-toolkit = "^2.3.0" coverage = "^7.4.4" +tblib = "^3.0.0" [tool.black] line-length = 100