Skip to content

Commit

Permalink
Merge branch 'kafbat:main' into Feature/Azure-entra
Browse files Browse the repository at this point in the history
  • Loading branch information
tnewman-at-gm authored Aug 26, 2024
2 parents 3324341 + 273e64c commit 9871378
Show file tree
Hide file tree
Showing 16 changed files with 120 additions and 93 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ jobs:
run: |
mkdir -p ./e2e-tests/target/selenoid-results/video
mkdir -p ./e2e-tests/target/selenoid-results/logs
docker-compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
docker-compose -f ./documentation/compose/e2e-tests.yaml up -d
docker compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
docker compose -f ./documentation/compose/e2e-tests.yaml up -d
- name: Dump Docker logs on failure
if: failure()
Expand Down
39 changes: 20 additions & 19 deletions api/src/main/java/io/kafbat/ui/controller/AccessController.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,34 @@ public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExch
.map(SecurityContext::getAuthentication)
.map(Principal::getName);

var builder = AuthenticationInfoDTO.builder()
.rbacEnabled(accessControlService.isRbacEnabled());

return userName
.zipWith(permissions)
.map(data -> {
var dto = new AuthenticationInfoDTO(accessControlService.isRbacEnabled());
dto.setUserInfo(new UserInfoDTO(data.getT1(), data.getT2()));
return dto;
})
.switchIfEmpty(Mono.just(new AuthenticationInfoDTO(accessControlService.isRbacEnabled())))
.map(data -> (AuthenticationInfoDTO) builder
.userInfo(new UserInfoDTO(data.getT1(), data.getT2()))
.build()
)
.switchIfEmpty(Mono.just(builder.build()))
.map(ResponseEntity::ok);
}

private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, List<String> clusters) {
return permissions
.stream()
.map(permission -> {
UserPermissionDTO dto = new UserPermissionDTO();
dto.setClusters(clusters);
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
dto.setValue(permission.getValue());
dto.setActions(permission.getParsedActions()
.stream()
.map(p -> p.name().toUpperCase())
.map(this::mapAction)
.filter(Objects::nonNull)
.toList());
return dto;
})
.map(permission -> (UserPermissionDTO) UserPermissionDTO.builder()
.clusters(clusters)
.resource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()))
.value(permission.getValue())
.actions(permission.getParsedActions()
.stream()
.map(p -> p.name().toUpperCase())
.map(this::mapAction)
.filter(Objects::nonNull)
.toList())
.build()
)
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String cluster
}

@Override
@SuppressWarnings("unchecked")
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
String pipeId,
ServerWebExchange exchange) {
Expand Down
29 changes: 12 additions & 17 deletions api/src/main/java/io/kafbat/ui/controller/TopicsController.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package io.kafbat.ui.controller;

import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_RUN;
import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_VIEW;
import static io.kafbat.ui.model.rbac.permission.TopicAction.CREATE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.DELETE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -272,7 +273,7 @@ public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicN

var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.topicActions(topicName, ANALYSIS_RUN)
.operationName("analyzeTopic")
.build();

Expand All @@ -288,7 +289,7 @@ public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.topicActions(topicName, ANALYSIS_RUN)
.operationName("cancelTopicAnalysis")
.build();

Expand All @@ -306,7 +307,7 @@ public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterNam

var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.topicActions(topicName, ANALYSIS_VIEW)
.operationName("getTopicAnalysis")
.build();

Expand Down Expand Up @@ -350,18 +351,12 @@ private Comparator<InternalTopic> getComparatorForTopic(
if (orderBy == null) {
return defaultComparator;
}
switch (orderBy) {
case TOTAL_PARTITIONS:
return Comparator.comparing(InternalTopic::getPartitionCount);
case OUT_OF_SYNC_REPLICAS:
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case REPLICATION_FACTOR:
return Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE:
return Comparator.comparing(InternalTopic::getSegmentSize);
case NAME:
default:
return defaultComparator;
}
return switch (orderBy) {
case TOTAL_PARTITIONS -> Comparator.comparing(InternalTopic::getPartitionCount);
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
default -> defaultComparator;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public enum TopicAction implements PermissibleAction {
MESSAGES_READ(VIEW),
MESSAGES_PRODUCE(VIEW),
MESSAGES_DELETE(VIEW, EDIT),
ANALYSIS_VIEW(VIEW),
ANALYSIS_RUN(VIEW, ANALYSIS_VIEW),

;

Expand Down
67 changes: 33 additions & 34 deletions api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand All @@ -32,7 +31,6 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
@Test
void shouldNotFoundWhenNoSuchConsumerGroupId() {
String groupId = "groupA";
String expError = "The group id does not exist";
webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
Expand All @@ -47,12 +45,13 @@ void shouldOkWhenConsumerGroupIsNotActive() {

//Create a consumer and subscribe to the topic
String groupId = UUID.randomUUID().toString();
val consumer = createTestConsumerWithGroupId(groupId);
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));
try (val consumer = createTestConsumerWithGroupId(groupId)) {
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));

//Unsubscribe from all topics to be able to delete this consumer
consumer.unsubscribe();
//Unsubscribe from all topics to be able to delete this consumer
consumer.unsubscribe();
}

//Delete the consumer when it's INACTIVE and check
webTestClient
Expand All @@ -69,24 +68,24 @@ void shouldBeBadRequestWhenConsumerGroupIsActive() {

//Create a consumer and subscribe to the topic
String groupId = UUID.randomUUID().toString();
val consumer = createTestConsumerWithGroupId(groupId);
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));
try (val consumer = createTestConsumerWithGroupId(groupId)) {
consumer.subscribe(List.of(topicName));
consumer.poll(Duration.ofMillis(100));

//Try to delete the consumer when it's ACTIVE
String expError = "The group is not empty";
webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
.exchange()
.expectStatus()
.isBadRequest();
//Try to delete the consumer when it's ACTIVE
webTestClient
.delete()
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
.exchange()
.expectStatus()
.isBadRequest();
}
}

@Test
void shouldReturnConsumerGroupsWithPagination() throws Exception {
try (var groups1 = startConsumerGroups(3, "cgPageTest1");
var groups2 = startConsumerGroups(2, "cgPageTest2")) {
try (var ignored = startConsumerGroups(3, "cgPageTest1");
var ignored1 = startConsumerGroups(2, "cgPageTest2")) {
webTestClient
.get()
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest", LOCAL)
Expand Down Expand Up @@ -114,19 +113,19 @@ void shouldReturnConsumerGroupsWithPagination() throws Exception {
});

webTestClient
.get()
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
.exchange()
.expectStatus()
.isOk()
.expectBody(ConsumerGroupsPageResponseDTO.class)
.value(page -> {
assertThat(page.getPageCount()).isEqualTo(1);
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
assertThat(page.getConsumerGroups())
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
});
.get()
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
.exchange()
.expectStatus()
.isOk()
.expectBody(ConsumerGroupsPageResponseDTO.class)
.value(page -> {
assertThat(page.getPageCount()).isEqualTo(1);
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
assertThat(page.getConsumerGroups())
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
});

webTestClient
.get()
Expand Down Expand Up @@ -156,7 +155,7 @@ private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
return consumer;
})
.limit(count)
.collect(Collectors.toList());
.toList();
return () -> {
consumers.forEach(KafkaConsumer::close);
deleteTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ void testBase64DecodingWorks() {
}

private TopicMessageDTO msg() {
return new TopicMessageDTO(1, -1L, OffsetDateTime.now());
return TopicMessageDTO.builder()
.partition(1)
.offset(-1L)
.timestamp(OffsetDateTime.now())
.build();
}
}
29 changes: 18 additions & 11 deletions api/src/test/java/io/kafbat/ui/service/acl/AclCsvTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kafbat.ui.exception.ValidationException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
Expand All @@ -15,6 +16,8 @@
import org.apache.kafka.common.resource.ResourceType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

class AclCsvTest {
Expand All @@ -29,22 +32,26 @@ class AclCsvTest {
);

@ParameterizedTest
@ValueSource(strings = {
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost",

//without header
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ "\n"
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
+ "\n"
})
@MethodSource
void parsesValidInputCsv(String csvString) {
Collection<AclBinding> parsed = AclCsv.parseCsv(csvString);
assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
}

private static Stream<Arguments> parsesValidInputCsv() {
return Stream.of(
Arguments.of(
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"),
Arguments.of(
//without header
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
+ System.lineSeparator()
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
+ System.lineSeparator()));
}

@ParameterizedTest
@ValueSource(strings = {
// columns > 7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ void testSyncAclWithAclCsv() {

aclsService.syncAclWithAclCsv(
CLUSTER,
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
+ "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
).block();

Expand Down
11 changes: 11 additions & 0 deletions contract/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${org.projectlombok.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -100,6 +105,12 @@
<useTags>true</useTags>
<useSpringBoot3>true</useSpringBoot3>
<dateLibrary>java8</dateLibrary>
<generatedConstructorWithRequiredArgs>false</generatedConstructorWithRequiredArgs>
<additionalModelTypeAnnotations>
@lombok.experimental.SuperBuilder
@lombok.NoArgsConstructor
@lombok.AllArgsConstructor
</additionalModelTypeAnnotations>
</configOptions>
</configuration>
</execution>
Expand Down
1 change: 1 addition & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3472,6 +3472,7 @@ components:
- UNASSIGNED
- TASK_FAILED
- RESTARTING
- STOPPED

ConnectorAction:
type: string
Expand Down
1 change: 1 addition & 0 deletions contract/src/main/resources/swagger/kafka-connect-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ components:
- PAUSED
- UNASSIGNED
- RESTARTING
- STOPPED
worker_id:
type: string
trace:
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/components/Topics/Topic/Statistics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const Metrics: React.FC = () => {
buttonSize="M"
permission={{
resource: ResourceType.TOPIC,
action: Action.MESSAGES_READ,
action: Action.ANALYSIS_RUN,
value: params.topicName,
}}
>
Expand Down Expand Up @@ -110,7 +110,7 @@ const Metrics: React.FC = () => {
buttonSize="S"
permission={{
resource: ResourceType.TOPIC,
action: Action.MESSAGES_READ,
action: Action.ANALYSIS_RUN,
value: params.topicName,
}}
>
Expand Down
Loading

0 comments on commit 9871378

Please sign in to comment.