Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ExecutionEnvironment model so object-level roles work with DAB RBAC system #15289

Merged
merged 5 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions awx/main/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,12 +1387,11 @@ def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
class ExecutionEnvironmentAccess(BaseAccess):
"""
I can see an execution environment when:
- I'm a superuser
- I'm a member of the same organization
- it is a global ExecutionEnvironment
- I can see its organization
- It is a global ExecutionEnvironment
I can create/change an execution environment when:
- I'm a superuser
- I'm an admin for the organization(s)
- I have an organization or object role that gives access
"""

model = ExecutionEnvironment
Expand All @@ -1416,15 +1415,15 @@ def can_change(self, obj, data):
raise PermissionDenied
if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED:
if not self.user.has_obj_perm(obj, 'change'):
raise PermissionDenied
return False
else:
if self.user not in obj.organization.execution_environment_admin_role:
raise PermissionDenied
if data and 'organization' in data:
new_org = get_object_from_data('organization', Organization, data, obj=obj)
if not new_org or self.user not in new_org.execution_environment_admin_role:
return False
return self.check_related('organization', Organization, data, obj=obj, mandatory=True, role_field='execution_environment_admin_role')
return self.check_related('organization', Organization, data, obj=obj, role_field='execution_environment_admin_role')

def can_delete(self, obj):
if obj.managed:
Expand Down
26 changes: 26 additions & 0 deletions awx/main/migrations/0195_EE_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 4.2.6 on 2024-06-20 15:55

from django.db import migrations


def delete_execution_environment_read_role(apps, schema_editor):
permission_classes = [apps.get_model('auth', 'Permission'), apps.get_model('dab_rbac', 'DABPermission')]
for permission_cls in permission_classes:
ee_read_perm = permission_cls.objects.filter(codename='view_executionenvironment').first()
if ee_read_perm:
ee_read_perm.delete()


class Migration(migrations.Migration):

dependencies = [
('main', '0194_alter_inventorysource_source_and_more'),
]

operations = [
migrations.AlterModelOptions(
name='executionenvironment',
options={'default_permissions': ('add', 'change', 'delete'), 'ordering': ('-created',)},
),
migrations.RunPython(delete_execution_environment_read_role, migrations.RunPython.noop),
]
3 changes: 1 addition & 2 deletions awx/main/migrations/_dab_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ def get_permissions_for_role(role_field, children_map, apps):

# more special cases for those same above special org-level roles
if role_field.name == 'auditor_role':
for codename in ('view_notificationtemplate', 'view_executionenvironment'):
perm_list.append(Permission.objects.get(codename=codename))
perm_list.append(Permission.objects.get(codename='view_notificationtemplate'))

return perm_list

Expand Down
17 changes: 17 additions & 0 deletions awx/main/models/execution_environments.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from django.db import models
from django.utils.translation import gettext_lazy as _

from rest_framework.exceptions import ValidationError

from awx.api.versioning import reverse
from awx.main.models.base import CommonModel
from awx.main.validators import validate_container_image_name
Expand All @@ -12,6 +14,8 @@
class ExecutionEnvironment(CommonModel):
class Meta:
ordering = ('-created',)
# Remove view permission, as a temporary solution, defer to organization read permission
default_permissions = ('add', 'change', 'delete')

PULL_CHOICES = [
('always', _("Always pull container before running.")),
Expand Down Expand Up @@ -53,3 +57,16 @@ class Meta:

def get_absolute_url(self, request=None):
return reverse('api:execution_environment_detail', kwargs={'pk': self.pk}, request=request)

def validate_role_assignment(self, actor, role_definition):
if self.managed:
raise ValidationError({'object_id': _('Can not assign object roles to managed Execution Environments')})
if self.organization_id is None:
raise ValidationError({'object_id': _('Can not assign object roles to global Execution Environments')})

if actor._meta.model_name == 'user' and (not actor.has_obj_perm(self.organization, 'view')):
raise ValidationError({'user': _('User must have view permission to Execution Environment organization')})
if actor._meta.model_name == 'team':
organization_cls = self._meta.get_field('organization').related_model
if self.orgaanization not in organization_cls.access_qs(actor, 'view'):
raise ValidationError({'team': _('Team must have view permission to Execution Environment organization')})
7 changes: 7 additions & 0 deletions awx/main/tests/functional/test_migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,10 @@ def test_migrate_DAB_RBAC(self, migrator):
# Test special cases in managed role creation
assert not RoleDefinition.objects.filter(name='Organization Team Admin').exists()
assert not RoleDefinition.objects.filter(name='Organization InstanceGroup Admin').exists()

# Test that a removed EE model permission has been deleted
new_state = migrator.apply_tested_migration(
('main', '0195_EE_permissions'),
)
DABPermission = new_state.apps.get_model('dab_rbac', 'DABPermission')
assert not DABPermission.objects.filter(codename='view_executionenvironment').exists()
107 changes: 107 additions & 0 deletions awx/main/tests/functional/test_rbac_execution_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import pytest

from django.contrib.contenttypes.models import ContentType

from awx.main.access import ExecutionEnvironmentAccess
from awx.main.models import ExecutionEnvironment, Organization
from awx.main.models.rbac import get_role_codenames

from awx.api.versioning import reverse
from django.urls import reverse as django_reverse

from ansible_base.rbac.models import RoleDefinition


@pytest.fixture
def ee_rd():
return RoleDefinition.objects.create_from_permissions(
name='EE object admin',
permissions=['change_executionenvironment', 'delete_executionenvironment'],
content_type=ContentType.objects.get_for_model(ExecutionEnvironment),
)


@pytest.fixture
def org_ee_rd():
return RoleDefinition.objects.create_from_permissions(
name='EE org admin',
permissions=['add_executionenvironment', 'change_executionenvironment', 'delete_executionenvironment', 'view_organization'],
content_type=ContentType.objects.get_for_model(Organization),
)


@pytest.mark.django_db
def test_old_ee_role_maps_to_correct_permissions(organization):
assert set(get_role_codenames(organization.execution_environment_admin_role)) == {
'view_organization',
'add_executionenvironment',
'change_executionenvironment',
'delete_executionenvironment',
}


@pytest.fixture
def org_ee(organization):
return ExecutionEnvironment.objects.create(name='some user ee', organization=organization)


@pytest.fixture
def check_user_capabilities(get, setup_managed_roles):
def _rf(user, obj, expected):
url = reverse('api:execution_environment_list')
r = get(url, user=user, expect=200)
for item in r.data['results']:
if item['id'] == obj.pk:
assert expected == item['summary_fields']['user_capabilities']
break
else:
raise RuntimeError(f'Could not find expected object ({obj}) in EE list result: {r.data}')

return _rf


# ___ begin tests ___


@pytest.mark.django_db
def test_managed_ee_not_assignable(control_plane_execution_environment, ee_rd, rando, admin_user, post):
url = django_reverse('roleuserassignment-list')
r = post(url, {'role_definition': ee_rd.pk, 'user': rando.id, 'object_id': control_plane_execution_environment.pk}, user=admin_user, expect=400)
assert 'Can not assign object roles to managed Execution Environment' in str(r.data)


@pytest.mark.django_db
def test_org_member_required_for_assignment(org_ee, ee_rd, rando, admin_user, post):
url = django_reverse('roleuserassignment-list')
r = post(url, {'role_definition': ee_rd.pk, 'user': rando.id, 'object_id': org_ee.pk}, user=admin_user, expect=400)
assert 'User must have view permission to Execution Environment organization' in str(r.data)


@pytest.mark.django_db
def test_give_object_permission_to_ee(org_ee, ee_rd, org_member, check_user_capabilities):
access = ExecutionEnvironmentAccess(org_member)
assert access.can_read(org_ee) # by virtue of being an org member
assert not access.can_change(org_ee, {'name': 'new'})
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})

ee_rd.give_permission(org_member, org_ee)
assert access.can_change(org_ee, {'name': 'new'})

check_user_capabilities(org_member, org_ee, {'edit': True, 'delete': True, 'copy': False})


@pytest.mark.django_db
@pytest.mark.parametrize('style', ['new', 'old'])
def test_give_org_permission_to_ee(org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd):
access = ExecutionEnvironmentAccess(org_member)
assert not access.can_change(org_ee, {'name': 'new'})
check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})

if style == 'new':
org_ee_rd.give_permission(org_member, organization)
assert org_member.has_obj_perm(org_ee.organization, 'add_executionenvironment') # sanity
else:
organization.execution_environment_admin_role.members.add(org_member)

assert access.can_change(org_ee, {'name': 'new'})
check_user_capabilities(org_member, org_ee, {'edit': True, 'delete': True, 'copy': True})
Loading