Skip to content

Commit

Permalink
feat[bckend-RBAS-ABAS]:Added Role based permission system and integra…
Browse files Browse the repository at this point in the history
…ted action based access for services.
  • Loading branch information
shikharpa committed May 16, 2024
1 parent 1454f00 commit ce61f17
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 43 deletions.
2 changes: 1 addition & 1 deletion services/api/db/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class ProfileAPIView(APIView):
permission_classes = [IsAuthenticated, HasRolePermission]
permission_name = 'calender'
service_name = 'calender'

def get(self, request):
try:
Expand Down
98 changes: 83 additions & 15 deletions services/api/permission/models.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,116 @@
from django.db import models

class CustomPermission(models.Model):
class MethodName(models.Model):
name = models.CharField(max_length=100, unique=True)
def __str__(self):
return self.name

class Meta:
verbose_name = 'MethodName'
verbose_name_plural = 'MethodNames'
db_table = "MethodNames"

class AssociatedViews(models.Model):
name = models.CharField(max_length=100)
methods_name = models.ManyToManyField(MethodName)
def __str__(self):
return self.name
class Meta:
verbose_name = 'AssociatedViews'
verbose_name_plural = 'AssociatedViews'
db_table = "AssociatedViews"

def add_MethodNames(self, MethodNames):
self.methods_name.clear()
for method in MethodNames:
if MethodName.objects.filter(name=method).exists():
method = MethodName.objects.get(name=method)
self.methods_name.add(method)
else:
raise ValueError("name of class view does not exist in code base.")

class Service(models.Model):
name = models.CharField(max_length=100, unique=True)
description = models.TextField(blank=True)
parent = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')
parent_service = models.ForeignKey('self', on_delete=models.CASCADE, null=True, blank=True, related_name='children')

def __str__(self):
return self.name

class Meta:
verbose_name = 'Custom Permission'
verbose_name_plural = 'Custom Permissions'
db_table = "custom_permission"
verbose_name = 'Service'
verbose_name_plural = 'Services'
db_table = "service"

def get_all_children(self):
children = list(self.children.all())
for child in self.children.all():
children.extend(child.get_all_children())
return children

def get_all_parents(self):
parents = []
parent = self.parent_service
while parent is not None:
parents.insert(0, parent)
parent = parent.parent_service
parents.insert(0, self) # Insert the current service at the beginning
return parents

class CustomPermission(models.Model):
name = models.CharField(max_length=100, unique=True)
AssociatedViews = models.ManyToManyField(AssociatedViews, blank=True)

def __str__(self):
return self.name

class Meta:
verbose_name = 'CustomPermission'
verbose_name_plural = 'CustomPermissions'
db_table = "CustomPermission"

def add_AssociatedViews(self, AccessViews):
self.AssociatedViews.clear()
for AccessView in AccessViews:
if AssociatedViews.objects.filter(name=AccessView).exists():
view = AssociatedViews.objects.get(name=AccessView)
self.AssociatedViews.add(view)
else:
raise ValueError("name of class view does not exist in code base.")

class Role(models.Model):
name = models.CharField(max_length=100, unique=True)
services = models.ManyToManyField(Service, blank=True)
custom_permissions = models.ManyToManyField(CustomPermission, blank=True)

def __str__(self):
return self.name

def add_custom_permission(self, permission_data):
for permission in permission_data:
permission_name = permission.get('name')
self.custom_permissions.clear()
for permission_name in permission_data:
if CustomPermission.objects.filter(name=permission_name).exists():
permission = CustomPermission.objects.get(name=permission_name)
self.custom_permissions.add(permission)
else:
raise ValueError("Custom Permission does not exist.")

def add_services(self, services_data):
self.services.clear()
for service_name in services_data:
if Service.objects.filter(name=service_name).exists():
service = Service.objects.get(name=service_name)
self.services.add(service)
else:
raise ValueError("Service does not exist.")

def has_permission_recursive(self, permission_name):
# Check if the current role has the permission
if self.custom_permissions.filter(name=permission_name).exists():
def has_service_recursive(self, service_name):
# Check if the current role has the services
if self.services.filter(name=service_name).exists():
return True
# Check permissions from children roles
for child_permission in self.custom_permissions.all():
for child in child_permission.get_all_children():
if child.name == permission_name:
# Check services from children roles
for child_service in self.services.all():
for child in child_service.get_all_children():
if child.name == service_name:
return True
return False

Expand Down
37 changes: 29 additions & 8 deletions services/api/permission/permission.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
from rest_framework.permissions import BasePermission
from rest_framework.exceptions import PermissionDenied

from permission.models import Service
class HasRolePermission(BasePermission):
"""
Custom permission class that checks if a user has a specific role permission.
"""
def has_permission(self, request, view):
permission_name = getattr(view, 'permission_name', None)
if not permission_name:
raise ValueError("Permission name not provided in the view.")
if request.user.is_anonymous:
view_class_name = view.__class__.__name__
service_name = getattr(view, 'service_name', None)
requested_action = getattr(view, 'action', None)
if requested_action is None:
requested_action = request.method
if not service_name:
return True
if request.user.role:
role = request.user.role
"""
we can check here if user has access to any particular service else this recursive line could be omitted
and it will still work to check api access (class name with action) with respect to role.
"""
if role.has_service_recursive(service_name):
for permission in role.custom_permissions.all():
for AssociatedView in permission.AssociatedViews.all():
if view_class_name == AssociatedView.name:
for method in AssociatedView.methods_name.all():
if requested_action.lower() == method.name:
return True
return False
return False
if request.user.role and not request.user.role.has_permission_recursive(permission_name):
raise PermissionDenied("You do not have permission to access this view.")
return True

def has_object_permission(self, request, view, obj):
# Check if the object has an owner field
if not hasattr(obj, 'owner'):
return False
# Allow access only if the request user is the object's owner
return obj.owner == request.user or request.user.is_superuser
38 changes: 27 additions & 11 deletions services/api/permission/serializers.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
from rest_framework import serializers
from django.contrib.auth.models import User
from permission.models import Role, CustomPermission
from permission.models import Role, CustomPermission, Service, AssociatedViews, MethodName

class CustomPermissionSerializer(serializers.ModelSerializer):
parent_name = serializers.CharField(write_only=True, required=False)
class MethodNameSerializer(serializers.ModelSerializer):
class Meta:
model = MethodName
fields = '__all__'

class AssociatedViewsSerializer(serializers.ModelSerializer):
methods_name = MethodNameSerializer(many=True, required=False, allow_empty=True)
class Meta:
model = CustomPermission
fields = ['id', 'name', 'description', 'parent_name']
model = AssociatedViews
fields = '__all__'

class ServiceSerializer(serializers.ModelSerializer):
parent_service = serializers.CharField(required=False)
class Meta:
model = Service
fields = '__all__'

def create(self, validated_data):
parent_name = validated_data.pop('parent_name', None)
if parent_name:
parent = CustomPermission.objects.get(name=parent_name)
validated_data['parent'] = parent
parent_service = validated_data.pop('parent_service', None)
if parent_service:
parent = Service.objects.get(name=parent_service)
validated_data['parent_service'] = parent
return super().create(validated_data)

class CustomPermissionSerializer(serializers.ModelSerializer):
AssociatedViews = AssociatedViewsSerializer(many=True, required=False, allow_empty=True)
class Meta:
model = CustomPermission
fields = '__all__'

class RoleSerializer(serializers.ModelSerializer):
services = ServiceSerializer(many=True, required=False, allow_empty=True)
custom_permissions = CustomPermissionSerializer(many=True, required=False, allow_empty=True)

class Meta:
model = Role
fields = ('id', 'name', 'custom_permissions')
fields = '__all__'

8 changes: 7 additions & 1 deletion services/api/permission/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from django.urls import path
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView
from permission.views import CustomPermissionViewSet, RoleViewSet, AssignRoleToUserView, ServiceViewSet, AssociatedViewsViewSet, MethodNameViewSet

urlpatterns = [
path('permissions/', CustomPermissionViewSet.as_view({'get': 'list', 'post': 'create'}), name='permissions-list'),
path('permissions/<int:pk>/', CustomPermissionViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='permission-detail'),
path('roles/', RoleViewSet.as_view({'get': 'list', 'post': 'create'}), name='roles-list'),
path('roles/<int:pk>/', RoleViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='role-detail'),
path('services/', ServiceViewSet.as_view({'get': 'list', 'post': 'create'}), name='services-list'),
path('services/<int:pk>/', ServiceViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='service-detail'),
path('MethodName/', MethodNameViewSet.as_view({'get': 'list', 'post': 'create'}), name='http-methods-list'),
path('MethodName/<int:pk>/', MethodNameViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='http-method-detail'),
path('AccessControlViews/', AssociatedViewsViewSet.as_view({'get': 'list', 'post': 'create'}), name='http-methods-list'),
path('AccessControlViews/<int:pk>/', AssociatedViewsViewSet.as_view({'get': 'retrieve', 'put': 'update', 'delete': 'destroy'}), name='http-method-detail'),
path('users/<int:user_id>/assign-role/', AssignRoleToUserView.as_view(), name='assign-role')
]
85 changes: 78 additions & 7 deletions services/api/permission/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,113 @@
from rest_framework.views import APIView
from django.db import transaction
from db.models import User
from permission.models import CustomPermission, Role
from permission.serializers import CustomPermissionSerializer, RoleSerializer
from permission.models import CustomPermission, Role, AssociatedViews, Service, MethodName
from permission.serializers import CustomPermissionSerializer, RoleSerializer, AssociatedViewsSerializer, ServiceSerializer, MethodNameSerializer
from db.serializers import UserProfileSerialiser

class MethodNameViewSet(viewsets.ModelViewSet):
queryset = MethodName.objects.all()
serializer_class = MethodNameSerializer

class AssociatedViewsViewSet(viewsets.ModelViewSet):
queryset = AssociatedViews.objects.all()
serializer_class = AssociatedViewsSerializer
def create(self, request, *args, **kwargs):
MethodNames = request.data.pop('MethodNames', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
AssociatedViews = serializer.save()
AssociatedViews.add_MethodNames(MethodNames)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
MethodNames = request.data.pop('MethodNames', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_MethodNames(MethodNames)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class ServiceViewSet(viewsets.ModelViewSet):
queryset = Service.objects.all()
serializer_class = ServiceSerializer

class CustomPermissionViewSet(viewsets.ModelViewSet):
queryset = CustomPermission.objects.all()
serializer_class = CustomPermissionSerializer
# permission_classes = [IsAdminUser] # Only authenticated users can manage permissions
def create(self, request, *args, **kwargs):
AccessViews = request.data.pop('AccessViews', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
CustomPermission = serializer.save()
CustomPermission.add_AssociatedViews(AccessViews)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
AccessViews = request.data.pop('AccessViews', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_AssociatedViews(AccessViews)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class RoleViewSet(viewsets.ModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer

def create(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
serializer = self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
role = serializer.save()
role.add_custom_permission(custom_permissions_data)
role.add_custom_permission(permissions_data)
role.add_services(services_data)
return Response(serializer.data, status=status.HTTP_201_CREATED)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

def update(self, request, *args, **kwargs):
custom_permissions_data = request.data.pop('custom_permissions', [])
permissions_data = request.data.pop('permissions_data', [])
services_data = request.data.pop('services_data', [])
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data)
if serializer.is_valid(raise_exception=True):
try:
with transaction.atomic():
self.perform_update(serializer)
instance.add_custom_permission(custom_permissions_data)
instance.add_custom_permission(permissions_data)
instance.add_services(services_data)
return Response(serializer.data)
except ValueError as e:
return Response({'error': str(e)}, status=status.HTTP_400_BAD_REQUEST)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
Expand Down

0 comments on commit ce61f17

Please sign in to comment.