Skip to content

Commit

Permalink
BE: RBAC classes refactoring (#116)
Browse files Browse the repository at this point in the history
Co-authored-by: iliax <[email protected]>
  • Loading branch information
iliax and iliax authored Feb 18, 2024
1 parent ccf3103 commit 47a3780
Show file tree
Hide file tree
Showing 26 changed files with 571 additions and 654 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AccessController implements AuthorizationApi {
private final AccessControlService accessControlService;

public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExchange exchange) {
Mono<List<UserPermissionDTO>> permissions = accessControlService.getUser()
Mono<List<UserPermissionDTO>> permissions = AccessControlService.getUser()
.map(user -> accessControlService.getRoles()
.stream()
.filter(role -> user.groups().contains(role.getName()))
Expand Down Expand Up @@ -64,9 +64,9 @@ private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, Lis
dto.setClusters(clusters);
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
dto.setValue(permission.getValue());
dto.setActions(permission.getActions()
dto.setActions(permission.getParsedActions()
.stream()
.map(String::toUpperCase)
.map(p -> p.name().toUpperCase())
.map(this::mapAction)
.filter(Objects::nonNull)
.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(id)
.consumerGroupActions(DELETE)
.consumerGroupActions(id, DELETE)
.operationName("deleteConsumerGroup")
.build();

Expand All @@ -66,8 +65,7 @@ public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clu
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(consumerGroupId)
.consumerGroupActions(VIEW)
.consumerGroupActions(consumerGroupId, VIEW)
.operationName("getConsumerGroup")
.build();

Expand All @@ -84,8 +82,7 @@ public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(Strin
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getTopicConsumerGroups")
.build();

Expand Down Expand Up @@ -142,9 +139,8 @@ public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName,
return resetDto.flatMap(reset -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(reset.getTopic())
.topicActions(TopicAction.VIEW)
.consumerGroupActions(RESET_OFFSETS)
.topicActions(reset.getTopic(), TopicAction.VIEW)
.consumerGroupActions(group, RESET_OFFSETS)
.operationName("resetConsumerGroupOffsets")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectors")
.build();

Expand All @@ -73,8 +72,7 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
.connectActions(connectName, ConnectAction.CREATE)
.operationName("createConnector")
.build();

Expand All @@ -91,9 +89,7 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnector")
.build();

Expand All @@ -110,8 +106,7 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.DELETE)
.connectActions(connectName, ConnectAction.DELETE)
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.build();
Expand All @@ -133,7 +128,6 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("getAllConnectors")
.build();

Expand All @@ -143,7 +137,6 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(

Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
.sort(comparator);

return Mono.just(ResponseEntity.ok(job))
Expand All @@ -158,8 +151,7 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorConfig")
.build();

Expand All @@ -178,8 +170,7 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -205,8 +196,7 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(connectActions)
.connectActions(connectName, connectActions)
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -225,8 +215,7 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorTasks")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -245,8 +234,7 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.connectActions(connectName, ConnectAction.VIEW, ConnectAction.RESTART)
.operationName("restartConnectorTask")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
Expand All @@ -264,8 +252,7 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(

var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connectActions(connectName, ConnectAction.VIEW)
.operationName("getConnectorPlugins")
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public Mono<ResponseEntity<Void>> deleteTopicMessages(

var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_DELETE)
.topicActions(topicName, MESSAGES_DELETE)
.build();

return validateAccess(context).<ResponseEntity<Void>>then(
Expand Down Expand Up @@ -89,8 +88,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.topicActions(topicName, MESSAGES_READ)
.operationName("getTopicMessages");

if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
Expand Down Expand Up @@ -127,8 +125,7 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(

var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_PRODUCE)
.topicActions(topicName, MESSAGES_PRODUCE)
.operationName("sendTopicMessages")
.build();

Expand Down Expand Up @@ -174,8 +171,7 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.topicActions(topicName, TopicAction.VIEW)
.operationName("getSerdes")
.build();

Expand Down
64 changes: 29 additions & 35 deletions api/src/main/java/io/kafbat/ui/controller/SchemasController.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibil
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("checkSchemaCompatibility")
.build();

Expand All @@ -72,31 +71,31 @@ public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibil
public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.CREATE)
.operationName("createNewSchema")
.build();

return validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
).doOnEach(sig -> audit(context, sig));
return newSchemaSubjectMono.flatMap(newSubject -> {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(newSubject.getSubject(), SchemaAction.CREATE)
.operationName("createNewSchema")
.build();
return validateAccess(context).then(
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
newSubject.getSubject(),
kafkaSrMapper.fromDto(newSubject)
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
);
}

@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteLatestSchema")
.build();

Expand All @@ -112,8 +111,7 @@ public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subject, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subject, SchemaAction.DELETE)
.operationName("deleteSchema")
.build();

Expand All @@ -129,8 +127,7 @@ public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.DELETE)
.schemaActions(subjectName, SchemaAction.DELETE)
.operationName("deleteSchemaByVersion")
.build();

Expand All @@ -146,8 +143,7 @@ public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subjectName, SchemaAction.VIEW)
.operationName("getAllVersionsBySubject")
.build();

Expand Down Expand Up @@ -175,8 +171,7 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getLatestSchema")
.build();

Expand All @@ -192,8 +187,7 @@ public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.schemaActions(subject, SchemaAction.VIEW)
.operationName("getSchemaByVersion")
.operationParams(Map.of("subject", subject, "version", version))
.build();
Expand Down Expand Up @@ -248,7 +242,7 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
.schemaGlobalCompatChange()
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();

Expand All @@ -268,16 +262,16 @@ public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {

var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.EDIT)
.schemaActions(subject, SchemaAction.EDIT)
.operationName("updateSchemaCompatibilityLevel")
.operationParams(Map.of("subject", subject))
.build();

return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
return compatibilityLevelMono.flatMap(compatibilityLevelDTO ->
validateAccess(context).then(
schemaRegistryService.updateSchemaCompatibility(
getCluster(clusterName),
subject,
Expand Down
Loading

0 comments on commit 47a3780

Please sign in to comment.