Skip to content

Commit

Permalink
feat[bckend-RBAS-ABAS]:Added Role based permission system and integra…
Browse files Browse the repository at this point in the history
…ted action based access for services.
  • Loading branch information
shikharpa committed Apr 29, 2024
1 parent 1454f00 commit 1f8b455
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 41 deletions.
2 changes: 1 addition & 1 deletion services/api/db/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class ProfileAPIView(APIView):
permission_classes = [IsAuthenticated, HasRolePermission]
permission_name = 'calender'
service_name = 'calender'

def get(self, request):
try:
Expand Down
81 changes: 66 additions & 15 deletions services/api/permission/models.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,99 @@
from django.db import models
from rest_framework.views import APIView

class CustomPermission(models.Model):
def get_http_methods():
# Get all possible HTTP methods from the rest_framework.views module
return [(method, method) for method in APIView.http_method_names]

class HTTPMethod(models.Model):
name = models.CharField(max_length=10, unique=True, choices=get_http_methods())
def __str__(self):
return self.name
class Meta:
verbose_name = 'http_method'
verbose_name_plural = 'http_methods'
db_table = "http_method"

class Service(models.Model):
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')
parent_service = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')

def __str__(self):
return self.name

class Meta:
verbose_name = 'Custom Permission'
verbose_name_plural = 'Custom Permissions'
db_table = "custom_permission"
verbose_name = 'Service'
verbose_name_plural = 'Services'
db_table = "service"

def get_all_children(self):
children = list(self.children.all())
for child in self.children.all():
children.extend(child.get_all_children())
return children

def get_all_parents(self):
parents = []
parent = self.parent_service
while parent is not None:
parents.insert(0, parent)
parent = parent.parent_service
parents.insert(0, self) # Insert the current service at the beginning
return parents

class CustomPermission(models.Model):
name = models.CharField(max_length=100, unique=True)
service = models.ForeignKey(Service, on_delete=models.CASCADE)
http_methods = models.ManyToManyField(HTTPMethod, blank=True)

def __str__(self):
return self.name

class Meta:
verbose_name = 'CustomPermission'
verbose_name_plural = 'CustomPermissions'
db_table = "CustomPermission"

def add_http_methods(self, http_methods):
for http_method in http_methods:
if HTTPMethod.objects.filter(name=http_method).exists():
http_method = HTTPMethod.objects.get(name=HTTPMethod)
self.http_methods.add(http_method)
else:
raise ValueError("HTTP method does not exist.")

class Role(models.Model):
name = models.CharField(max_length=100, unique=True)
services = models.ManyToManyField(Service, blank=True)
custom_permissions = models.ManyToManyField(CustomPermission, blank=True)

def __str__(self):
return self.name

def add_custom_permission(self, permission_data):
for permission in permission_data:
permission_name = permission.get('name')
for permission_name in permission_data:
if CustomPermission.objects.filter(name=permission_name).exists():
permission = CustomPermission.objects.get(name=permission_name)
self.custom_permissions.add(permission)
else:
raise ValueError("Custom Permission does not exist.")

def add_services(self, services_data):
for service_name in services_data:
if Service.objects.filter(name=service_name).exists():
service = Service.objects.get(name=service_name)
self.services.add(service)
else:
raise ValueError("Service does not exist.")

def has_permission_recursive(self, permission_name):
# Check if the current role has the permission
if self.custom_permissions.filter(name=permission_name).exists():
def has_service_recursive(self, service_name):
# Check if the current role has the services
if self.services.filter(name=service_name).exists():
return True
# Check permissions from children roles
for child_permission in self.custom_permissions.all():
for child in child_permission.get_all_children():
if child.name == permission_name:
# Check services from children roles
for child_service in self.services.all():
for child in child_service.get_all_children():
if child.name == service_name:
return True
return False

Expand Down
21 changes: 14 additions & 7 deletions services/api/permission/permission.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
from rest_framework.permissions import BasePermission
from rest_framework.exceptions import PermissionDenied

from permission.models import Service
class HasRolePermission(BasePermission):
"""
Custom permission class that checks if a user has a specific role permission.
"""
def has_permission(self, request, view):
permission_name = getattr(view, 'permission_name', None)
if not permission_name:
raise ValueError("Permission name not provided in the view.")
service_name = getattr(view, 'service_name', None)
if not service_name:
return True
if request.user.is_anonymous:
return False
if request.user.role and not request.user.role.has_permission_recursive(permission_name):
raise PermissionDenied("You do not have permission to access this view.")
return True
if request.user.role:
role = request.user.role
if role.has_service_recursive(service_name):
service = Service.objects.get(name=service_name)
parent_services = service.get_all_parents()
for parent_service in parent_services:
# Check if the requested method is in parent_service.custom_permissions.http_methods
if request.method in [http_method.name for permission in role.custom_permissions.filter(service__name=parent_service.name) for http_method in permission.http_methods.all()]:
return True
raise PermissionDenied("You do not have permission to access this view.")
40 changes: 30 additions & 10 deletions services/api/permission/serializers.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
from rest_framework import serializers
from django.contrib.auth.models import User
from permission.models import Role, CustomPermission
from permission.models import Role, CustomPermission, Service, HTTPMethod

class CustomPermissionSerializer(serializers.ModelSerializer):
parent_name = serializers.CharField(write_only=True, required=False)
class HTTPMethodSerializer(serializers.ModelSerializer):
class Meta:
model = HTTPMethod
fields = '__all__'

class ServiceSerializer(serializers.ModelSerializer):
parent_service = serializers.CharField(write_only=True, required=False)

class Meta:
model = CustomPermission
fields = ['id', 'name', 'description', 'parent_name']
model = Service
fields = '__all__'

def create(self, validated_data):
parent_name = validated_data.pop('parent_name', None)
if parent_name:
parent = CustomPermission.objects.get(name=parent_name)
validated_data['parent'] = parent
parent_service = validated_data.pop('parent_service', None)
if parent_service:
parent = Service.objects.get(name=parent_service)
validated_data['parent_service'] = parent
return super().create(validated_data)

class CustomPermissionSerializer(serializers.ModelSerializer):
service = serializers.CharField(write_only=True, required=False)
http_methods = HTTPMethodSerializer(many=True, required=False, allow_empty=True)
class Meta:
model = CustomPermission
fields = '__all__'

def create(self, validated_data):
service = validated_data.pop('service', None)
if service:
service = Service.objects.get(name=service)
validated_data['service'] = service
return super().create(validated_data)

class RoleSerializer(serializers.ModelSerializer):
services = ServiceSerializer(many=True, required=False, allow_empty=True)
custom_permissions = CustomPermissionSerializer(many=True, required=False, allow_empty=True)

class Meta:
model = Role
fields = ('id', 'name', 'custom_permissions')
fields = '__all__'

6 changes: 5 additions & 1 deletion services/api/permission/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from django.urls import path
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView, ServiceViewSet, HTTPMethodViewSet

urlpatterns = [
path('permissions/', CustomPermissionViewSet.as_view({'get': 'list', 'post': 'create'}), name='permissions-list'),
path('permissions/<int:pk>/', CustomPermissionViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='permission-detail'),
path('roles/', RoleViewSet.as_view({'get': 'list', 'post': 'create'}), name='roles-list'),
path('roles/<int:pk>/', RoleViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='role-detail'),
path('services/', ServiceViewSet.as_view({'get': 'list', 'post': 'create'}), name='services-list'),
path('services/<int:pk>/', ServiceViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='service-detail'),
path('http-methods/', HTTPMethodViewSet.as_view({'get': 'list', 'post': 'create'}), name='http-methods-list'),
path('http-methods/<int:pk>/', HTTPMethodViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='http-method-detail'),
path('users/<int:user_id>/assign-role/', AssignRoleToUserView.as_view(), name='assign-role')
]
53 changes: 46 additions & 7 deletions services/api/permission/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,81 @@
from rest_framework.views import APIView
from django.db import transaction
from db.models import User
from permission.models import CustomPermission, Role
from permission.serializers import CustomPermissionSerializer, RoleSerializer
from permission.models import CustomPermission, Role, HTTPMethod, Service
from permission.serializers import CustomPermissionSerializer, RoleSerializer, HTTPMethodSerializer, ServiceSerializer
from db.serializers import UserProfileSerialiser

class HTTPMethodViewSet(viewsets.ModelViewSet):
queryset = HTTPMethod.objects.all()
serializer_class = HTTPMethodSerializer

class ServiceViewSet(viewsets.ModelViewSet):
queryset = Service.objects.all()
serializer_class = ServiceSerializer

class CustomPermissionViewSet(viewsets.ModelViewSet):
queryset = CustomPermission.objects.all()
serializer_class = CustomPermissionSerializer
# permission_classes = [IsAdminUser] # Only authenticated users can manage permissions
def create(self, request, *args, **kwargs):
http_methods = request.data.pop('http_methods', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
role = serializer.save()
role.add_http_methods(http_methods)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
http_methods = request.data.pop('http_methods', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_http_methods(http_methods)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class RoleViewSet(viewsets.ModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer

def create(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
role = serializer.save()
role.add_custom_permission(custom_permissions_data)
role.add_custom_permission(permissions_data)
role.add_services(services_data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_custom_permission(custom_permissions_data)
instance.add_custom_permission(permissions_data)
instance.add_services(services_data)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
Expand Down

0 comments on commit 1f8b455

Please sign in to comment.