Skip to content

Commit

Permalink
Add QGIS server protected attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrunner committed Oct 4, 2024
1 parent 35bc552 commit 03c6737
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 4 deletions.
37 changes: 34 additions & 3 deletions docker/qgisserver/geomapfish_qgisserver/accesscontrol.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ def layerPermissions( # pylint: disable=invalid-name
ogc_layer_name = self.ogc_layer_name(layer)
if ogc_layer_name not in layers:
return rights
gmf_layers = self.get_layers(session)[ogc_layer_name]
gmf_layers = layers[ogc_layer_name]
finally:
session.close()
access, _ = self.get_restriction_areas(gmf_layers, roles=roles)
Expand All @@ -718,7 +718,14 @@ def authorizedLayerAttributes( # pylint: disable=invalid-name
"""
Returns the authorized layer attributes.
"""
del layer

roles = self.get_roles(self.DBSession())
if roles == "ROOT":
return attributes

assert not isinstance(roles, str) # nosec

from c2cgeoportal_commons.models.main import Role # pylint: disable=import-outside-toplevel

if self.ogcserver is None:
parameters = self.serverInterface().requestHandler().parameterMap()
Expand All @@ -728,7 +735,31 @@ def authorizedLayerAttributes( # pylint: disable=invalid-name
)
return []

return attributes
session = self.DBSession()
try:
layers = self.get_layers(session)
ogc_layer_name = self.ogc_layer_name(layer)
if ogc_layer_name not in layers:
return []
gmf_layers = layers[ogc_layer_name]
protected_attributes = []
for gmf_layer in gmf_layers:
for metadata in gmf_layer.get_metadatas("protectedAttribute"):
protected_attributes.extend(metadata.value.split(","))

attributes_access = [a for a in attributes if a not in protected_attributes]
for role in roles:
for functionality in role.functionalities:
if functionality.name == "attribute_access" and ":" in functionality.value:
layer_name, layer_protected_attributes = functionality.value.split(":", 1)
if layer_name == ogc_layer_name:
for protected_attribute in layer_protected_attributes.split(","):
if protected_attribute in attributes:
attributes_access.append(protected_attribute)
return attributes_access

finally:
session.close()

def allowToEdit(self, layer: QgsVectorLayer, feature: QgsFeature) -> bool: # pylint: disable=invalid-name
"""Are we authorize to modify the following geometry."""
Expand Down
225 changes: 224 additions & 1 deletion docker/qgisserver/tests/functional/accesscontrol_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2023, Camptocamp SA
# Copyright (c) 2018-2024, Camptocamp SA
# All rights reserved.

# This program is free software; you can redistribute it and/or modify it under the terms of the
Expand Down Expand Up @@ -187,6 +187,143 @@ def test_data(clean_dbsession):
"project": project,
}

dbsession.delete_all(
[
ogc_server1,
ogc_server2,
role1,
role2,
root,
user1,
user2,
ra1,
ra2,
public_group,
public_layer,
private_layer1,
private_layer2,
private_group3,
private_layer3,
]
)
dbsession.commit()


def _test_restricted(clean_dbsession, restricted: bool):
from c2cgeoportal_commons.models.main import (
OGCSERVER_AUTH_STANDARD,
OGCSERVER_TYPE_QGISSERVER,
Functionality,
LayerWMS,
Metadata,
OGCServer,
Role,
)
from c2cgeoportal_commons.models.static import User

DBSession = clean_dbsession # noqa: ignore=N806

dbsession = DBSession()

ogc_server1 = OGCServer(
name="qgisserver1",
type_=OGCSERVER_TYPE_QGISSERVER,
url="http://qgis",
image_type="image/png",
auth=OGCSERVER_AUTH_STANDARD,
)
dbsession.add(ogc_server1)

role1 = Role("role1")
if restricted:
role1.functionalities = [
Functionality("attribute_access", "ows_private_layer1:protected1,protected2"),
Functionality("attribute_access", "ows_private_layer2:protected1,protected2"),
]
dbsession.add(role1)

root = User("root")
root.id = 0
user1 = User("user1", roles=[role1])
dbsession.add_all((root, user1))

project = QgsProject.instance()

for node in [
{
"name": "root",
"type": "group",
"children": [
{
"name": "ows_private_group1",
"type": "group",
"children": [{"name": "ows_private_layer1", "type": "layer"}],
},
{"name": "ows_private_layer2", "type": "layer"},
],
}
]:
add_node_in_qgis_project(project, project.layerTreeRoot(), node)

private_layer1 = LayerWMS(name="private_layer2", layer="ows_private_layer2", public=False)
private_layer1.ogc_server = ogc_server1
if restricted:
private_layer1.metadata = [Metadata("protectedAttribute", "protected1,protected2,protected3")]

private_group1 = LayerWMS(name="private_group1", layer="ows_private_group1", public=False)
private_group1.ogc_server = ogc_server1
if restricted:
private_group1.metadata = [Metadata("protectedAttribute", "protected1,protected2,protected3")]

dbsession.add_all(
(
private_layer1,
private_group1,
)
)

dbsession.flush()
dbsession.commit()
dbsession.close()

return {
# "ogc_servers": ogc_servers,
"role": role1,
"user": user1,
"all": [role1, root, user1, ogc_server1, private_layer1, private_group1],
# "layers": layers,
# "restriction_areas": restriction_areas,
"project": project,
}


@pytest.fixture(scope="module")
def test_data_restricted_ref(clean_dbsession):
from c2cgeoportal_commons.models.main import LayerWMS, OGCServer, Role
from c2cgeoportal_commons.models.static import User

test_data = _test_restricted(clean_dbsession, False)
yield test_data
DBSession = clean_dbsession # noqa: ignore=N806

dbsession = DBSession()
dbsession.delete_all(test_data["all"])
dbsession.commit()


@pytest.fixture(scope="module")
def test_data_restricted(clean_dbsession):
from c2cgeoportal_commons.models.main import Functionality, LayerWMS, Metadata, OGCServer, Role
from c2cgeoportal_commons.models.static import User

test_data = _test_restricted(clean_dbsession, True)
yield test_data
DBSession = clean_dbsession # noqa: ignore=N806

dbsession = DBSession()
dbsession.delete_all(test_data["all"])
dbsession.commit()


@pytest.fixture(scope="function")
def wms_use_layer_ids(test_data):
Expand Down Expand Up @@ -548,3 +685,89 @@ def test_init(self, server_iface, test_data):

set_request_parameters(server_iface, {})
assert plugin.get_ogcserver_accesscontrol().ogcserver.name == "qgisserver2"


@pytest.mark.usefixtures(
"qgs_access_control_filter",
"auto_single_ogc_server_env",
"test_data_restricted",
)
class TestRestrictedAttributeRef:
def test_restricted_attribute(self, server_iface, test_data_restricted_ref):
plugin = GeoMapFishAccessControl(server_iface)

set_request_parameters(server_iface, {"USER_ID": "0"})

assert plugin.authorizedLayerAttributes(
test_data_restricted_ref["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]

set_request_parameters(server_iface, {"ROLE_IDS": test_data_restricted["role"].id})

assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]

set_request_parameters(server_iface, {})

assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]


@pytest.mark.usefixtures(
"qgs_access_control_filter",
"auto_single_ogc_server_env",
"test_data_restricted",
)
class TestRestrictedAttribute:
def test_restricted_attribute(self, server_iface, test_data_restricted):
plugin = GeoMapFishAccessControl(server_iface)

set_request_parameters(server_iface, {"USER_ID": "0"})

assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2", "protected3"]

set_request_parameters(server_iface, {"ROLE_IDS": test_data_restricted["role"].id})

assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public", "protected1", "protected2"]

set_request_parameters(server_iface, {})

assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_layer2"),
["public", "protected1", "protected2", "protected3"],
) == ["public"]
assert plugin.authorizedLayerAttributes(
test_data_restricted["project"].mapLayersByName("oms_private_grpoup1"),
["public", "protected1", "protected2", "protected3"],
) == ["public"]
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ vars:
List of attribute names which have enumerated attribute values (for filters purpose).
relevant_for:
- layer_wms
- name: protectedAttributes
type: list
description: >
For QGIS server: List of attribute names that's protected, to be used with the `attribute_access` functionality.
relevant_for:
- layer_wms
# WMTS layers
- name: ogcServer
description: >
Expand Down Expand Up @@ -863,6 +869,13 @@ vars:
Name of a panel to open upon loading an application.
relevant_for:
- role
- name: attribute_access
description: >
For QGIS server: List of protected attribute names that we can access, to be used with the `protectedAttributes` metadata.
Syntax: <WMS layer name>:<attribute name>,<attribute name>,...
relevant_for:
- role

getitfixed:
enabled: false
Expand Down

0 comments on commit 03c6737

Please sign in to comment.