Skip to content

Commit

Permalink
fix(provider/kafka): fix NPE when configuring default quota (#489)
Browse files Browse the repository at this point in the history
Fix: #489
  • Loading branch information
fhussonnois committed Sep 7, 2024
1 parent 350d439 commit 8f65159
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package io.streamthoughts.jikkou.kafka.change.quota;

import io.streamthoughts.jikkou.common.utils.Pair;
import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.StateChange;
Expand All @@ -15,6 +16,7 @@
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -97,7 +99,13 @@ public KafkaClientQuotaChangeHandler(@NotNull final AdminClient client,

@NotNull
private static ClientQuotaEntity getClientQuotaEntity(ResourceChange resource) {
return new ClientQuotaEntity(TypeConverter.<String, String>ofMap().convertValue(resource.getSpec().getData()));
Map<String, String> entries = TypeConverter.<String, String>ofMap()
.convertValue(resource.getSpec().getData())
.entrySet()
.stream()
.map(entry -> Pair.of(entry).mapRight(v -> v.isEmpty() ? null : v))
.collect(HashMap::new, (m, v) -> m.put(v._1(), v._2()), HashMap::putAll);
return new ClientQuotaEntity(entries);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public boolean isValid(@NotNull final Map<String, String> entities) {
@Override
public Map<String, String> toEntities(@NotNull final KafkaClientQuotaEntity entity) {
Map<String, String> entities = new HashMap<>();
entities.put(ClientQuotaEntity.USER, DEFAULT);
entities.put(ClientQuotaEntity.USER, DEFAULT_ENTITY);
return entities;
}

Expand Down Expand Up @@ -180,7 +180,7 @@ public boolean isValid(@NotNull final Map<String, String> entities) {
public Map<String, String> toEntities(@NotNull final KafkaClientQuotaEntity entity) {
Map<String, String> entities = new HashMap<>();
entities.put(ClientQuotaEntity.USER, entity.getUser());
entities.put(ClientQuotaEntity.CLIENT_ID, DEFAULT);
entities.put(ClientQuotaEntity.CLIENT_ID, DEFAULT_ENTITY);
return entities;
}

Expand Down Expand Up @@ -222,7 +222,7 @@ public boolean isValid(@NotNull final Map<String, String> entities) {
@Override
public Map<String, String> toEntities(final KafkaClientQuotaEntity entity) {
Map<String, String> entities = new HashMap<>();
entities.put(ClientQuotaEntity.CLIENT_ID, DEFAULT);
entities.put(ClientQuotaEntity.CLIENT_ID, DEFAULT_ENTITY);
return entities;
}

Expand Down Expand Up @@ -268,7 +268,7 @@ public String toPettyString(final @NotNull Map<String, String> entities) {
}
};

public static final String DEFAULT = null;
public static final String DEFAULT_ENTITY = "";

/**
* Validates the given map of quota entities for this type.
Expand Down

0 comments on commit 8f65159

Please sign in to comment.