Skip to content

Commit

Permalink
feat: add Kafka quota support for Aiven (#264)
Browse files Browse the repository at this point in the history
Resolved: #264
  • Loading branch information
fhussonnois committed Sep 5, 2023
1 parent b9bec9a commit 8e1b14f
Show file tree
Hide file tree
Showing 36 changed files with 2,033 additions and 77 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ docker-tag-latest: build-images
docker tag ${REPOSITORY}/${IMAGE}:${VERSION} ${REPOSITORY}/${IMAGE}:latest || exit 1 ;

changelog:
./mvnw jreleaser:changelog -Prelease -f ./jikkou-cli/pom.xml
./mvnw jreleaser:changelog -Prelease -f ./jikkou-cli/pom.xml

install:
./mvnw clean package -Pnative,deb -DskipTests
sudo dpkg -i ./dist/jikkou-${VERSION}-linux-x86_64.deb

clean: clean-containers clean-images clean-build
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,12 @@ public List<HasMetadata> getResources(final @NotNull ResourceType resourceType,
ResourceConverter<HasMetadata, HasMetadata> converter = collector.getResourceConverter(resourceType);
List<HasMetadata> result = resources.stream()
.map(resource -> {
ObjectMeta.ObjectMetaBuilder objectMetaBuilder = resource.getMetadata().toBuilder();
return resource.withMetadata(
objectMetaBuilder.withAnnotation(ObjectMeta.ANNOT_GENERATED, Instant.now()).build()
);
ObjectMeta meta = resource
.optionalMetadata()
.orElse(new ObjectMeta()).toBuilder()
.withAnnotation(ObjectMeta.ANNOT_GENERATED, Instant.now())
.build();
return resource.withMetadata(meta);
})
.toList();
return converter.convertTo(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ protected ChangeType getChangeType(T before, T after) {
return before == null ? ADD : after == null ? DELETE : UPDATE;
}

private record BeforeAndAfter<T>(T before, T after) {
}
private record BeforeAndAfter<T>(T before, T after) { }

@FunctionalInterface
public interface ChangeValueMapper<T, V> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.jikkou.extension.aiven.api;

import io.streamthoughts.jikkou.JikkouMetadataAnnotations;
import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ChangeResult;
import io.streamthoughts.jikkou.api.control.ValueChange;
import io.streamthoughts.jikkou.api.model.ObjectMeta;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.extension.aiven.adapter.KafkaQuotaAdapter;
import io.streamthoughts.jikkou.extension.aiven.api.data.KafkaQuotaEntry;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaQuotaCollector;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaQuotaController;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuota;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuotaSpec;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("integration")
class KafkaQuotaEntryIT {

public static final List<ResourceSelector> NO_SELECTOR = Collections.emptyList();

public static MockWebServer SERVER;

private static KafkaQuotaController CONTROLLER;
private static KafkaQuotaCollector COLLECTOR;

@BeforeAll
static void setUp() throws IOException {
SERVER = new MockWebServer();
SERVER.start();

Configuration configuration = new Configuration
.Builder()
.with(AivenApiClientConfig.AIVEN_API_URL.key(), SERVER.url("/"))
.with(AivenApiClientConfig.AIVEN_PROJECT.key(), "project")
.with(AivenApiClientConfig.AIVEN_SERVICE.key(), "service")
.with(AivenApiClientConfig.AIVEN_TOKEN_AUTH.key(), "token")
.with(AivenApiClientConfig.AIVEN_DEBUG_LOGGING_ENABLED.key(), true)
.build();
COLLECTOR = new KafkaQuotaCollector(new AivenApiClientConfig(configuration));
CONTROLLER = new KafkaQuotaController(new AivenApiClientConfig(configuration));
}

@AfterAll
static void tearDown() throws IOException {
COLLECTOR.close();
CONTROLLER.close();
SERVER.shutdown();
}

@Test
void shouldListKafkaQuotaEntries() {
// Given
SERVER.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setResponseCode(200)
.setBody("""
{"quotas":[{"client-id":"default","consumer_byte_rate":1048576.0,"producer_byte_rate":1048576.0,"request_percentage":25.0,"user":"default"}]}
"""
));
// When
List<V1KafkaQuota> results = COLLECTOR.listAll(Configuration.empty(), NO_SELECTOR);

// Then
Assertions.assertNotNull(results);
V1KafkaQuota expected = V1KafkaQuota.builder()
.withSpec(V1KafkaQuotaSpec
.builder()
.withUser("default")
.withClientId("default")
.withProducerByteRate(1048576.0)
.withConsumerByteRate(1048576.0)
.withRequestPercentage(25.0)
.build()
)
.build();
Assertions.assertEquals(List.of(expected), results);
}

@Test
void shouldCreateKafkaQuota() {
// Given
SERVER.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setResponseCode(200)
.setBody("""
{"quotas":[{"client-id":"default","consumer_byte_rate":1048576.0,"producer_byte_rate":1048576.0,"request_percentage":25.0,"user":"default"}]}
"""
));
SERVER.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setResponseCode(200)
.setBody("""
{"message":"upsert"}
"""
));

V1KafkaQuota entry = V1KafkaQuota.builder()
.withSpec(V1KafkaQuotaSpec
.builder()
.withUser("test")
.withClientId("test")
.withProducerByteRate(1048576.0)
.withConsumerByteRate(1048576.0)
.withRequestPercentage(25.0)
.build()
)
.build();

// When
List<ChangeResult<ValueChange<KafkaQuotaEntry>>> results = CONTROLLER
.reconcile(List.of(entry), ReconciliationMode.CREATE, ReconciliationContext.with(false));

// Then
ValueChange<KafkaQuotaEntry> expected = ValueChange.withAfterValue(KafkaQuotaAdapter.map(entry));
Assertions.assertEquals(expected, results.get(0).data());
}


@Test
void shouldDeleteKafkaQuota() {
// Given
SERVER.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setResponseCode(200)
.setBody("""
{"quotas":[{"client-id":"default","consumer_byte_rate":1048576.0,"producer_byte_rate":1048576.0,"request_percentage":25.0,"user":"default"}]}
"""
));
SERVER.enqueue(new MockResponse()
.setHeader("Content-Type", "application/json")
.setResponseCode(200)
.setBody("""
{"message":"Deleted quota for for (User: default, Client-id: default)"}
"""

));
// When
V1KafkaQuota entry = V1KafkaQuota.builder()
.withMetadata(ObjectMeta.builder()
.withAnnotation(JikkouMetadataAnnotations.JIKKOU_IO_DELETE, true)
.build())
.withSpec(V1KafkaQuotaSpec
.builder()
.withUser("default")
.withClientId("default")
.withProducerByteRate(1048576.0)
.withConsumerByteRate(1048576.0)
.withRequestPercentage(25.0)
.build()
)
.build();

// When
List<ChangeResult<ValueChange<KafkaQuotaEntry>>> results = CONTROLLER
.reconcile(List.of(entry), ReconciliationMode.DELETE, ReconciliationContext.with(false));

// Then
ValueChange<KafkaQuotaEntry> expected = ValueChange.withBeforeValue(KafkaQuotaAdapter.map(entry));
Assertions.assertEquals(expected, results.get(0).data());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.extensions.ExtensionFactory;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaQuotaCollector;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaQuotaController;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaTopicAclEntryCollector;
import io.streamthoughts.jikkou.extension.aiven.control.KafkaTopicAclEntryController;
import io.streamthoughts.jikkou.extension.aiven.control.SchemaRegistryAclEntryCollector;
Expand All @@ -36,5 +38,7 @@ public void registerExtensions(@NotNull ExtensionFactory factory,
factory.register(SchemaRegistryAclEntryCollector.class, SchemaRegistryAclEntryCollector::new);
factory.register(SchemaRegistryAclEntryController.class, SchemaRegistryAclEntryController::new);
factory.register(SchemaRegistryAclEntryValidation.class, SchemaRegistryAclEntryValidation::new);
factory.register(KafkaQuotaCollector.class, KafkaQuotaCollector::new);
factory.register(KafkaQuotaController.class, KafkaQuotaController::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.streamthoughts.jikkou.extension.aiven;

import io.streamthoughts.jikkou.api.ResourceContext;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuota;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuotaList;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaTopicAclEntry;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaTopicAclEntryList;
import io.streamthoughts.jikkou.extension.aiven.models.V1SchemaRegistryAclEntry;
Expand All @@ -31,5 +33,7 @@ public void registerAll(ResourceContext context) {
context.register(V1KafkaTopicAclEntryList.class);
context.register(V1SchemaRegistryAclEntry.class);
context.register(V1SchemaRegistryAclEntryList.class);
context.register(V1KafkaQuota.class);
context.register(V1KafkaQuotaList.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.jikkou.extension.aiven.adapter;

import io.streamthoughts.jikkou.extension.aiven.api.data.KafkaQuotaEntry;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuota;
import io.streamthoughts.jikkou.extension.aiven.models.V1KafkaQuotaSpec;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;

/**
*
*/
public final class KafkaQuotaAdapter {

public static final String DEFAULT = "default";

public static KafkaQuotaEntry map(final @NotNull V1KafkaQuota entry) {
return new KafkaQuotaEntry(
Optional.ofNullable(entry.getSpec().getClientId()).orElse(DEFAULT),
Optional.ofNullable(entry.getSpec().getUser()).orElse(DEFAULT),
entry.getSpec().getConsumerByteRate(),
entry.getSpec().getProducerByteRate(),
entry.getSpec().getRequestPercentage()
);
}

public static V1KafkaQuota map(final @NotNull KafkaQuotaEntry entry) {
return V1KafkaQuota.builder()
.withSpec(V1KafkaQuotaSpec
.builder()
.withClientId(entry.clientId())
.withUser(entry.user())
.withConsumerByteRate(entry.consumerByteRate())
.withProducerByteRate(entry.producerByteRate())
.withRequestPercentage(entry.requestPercentage())
.build()
)
.build();
}

private KafkaQuotaAdapter() {}
}
Loading

0 comments on commit 8e1b14f

Please sign in to comment.