From 8668073d99d0dd0c4e8e2a57963b40975e96b017 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Mon, 23 Sep 2024 18:31:17 -0700 Subject: [PATCH] WIP --- .../config/RisingwaveBindingConfig.java | 3 + .../config/RisingwaveCommandType.java | 1 + .../statement/RisingwaveCommandTemplate.java | 4 +- .../RisingwaveDropTopicTemplate.java | 59 +++++++++++++++++++ .../stream/RisingwaveCompletionCommand.java | 1 + .../stream/RisingwaveProxyFactory.java | 50 ++++++++++++++-- 6 files changed, 112 insertions(+), 6 deletions(-) create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDropTopicTemplate.java diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index 602e124808..c23a5fd718 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -31,6 +31,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTableTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTopicTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeTemplate; +import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDropTopicTemplate; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.CatalogedConfig; @@ -44,6 +45,7 @@ public final class RisingwaveBindingConfig public final KindConfig kind; public final List routes; public final RisingwaveCreateTopicTemplate createTopic; + public final RisingwaveDropTopicTemplate deleteTopic; public final RisingwaveCreateMaterializedViewTemplate createView; public final RisingwaveDescribeTemplate describeView; public final RisingwaveCreateTableTemplate createTable; @@ -87,6 +89,7 @@ public RisingwaveBindingConfig( location, config.kafkaScanStartupTimestampMillis()); this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location); this.createTopic = new RisingwaveCreateTopicTemplate(); + this.deleteTopic = new RisingwaveDropTopicTemplate(); this.createView = new RisingwaveCreateMaterializedViewTemplate(); this.describeView = new RisingwaveDescribeTemplate(); this.createFunction = new RisingwaveCreateFunctionTemplate(udf); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java index c0a0409270..7f911c5942 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java @@ -20,6 +20,7 @@ public enum RisingwaveCommandType { CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()), CREATE_TABLE_COMMAND("CREATE TABLE".getBytes()), + DROP_TABLE_COMMAND("DROP TABLE".getBytes()), CREATE_MATERIALIZED_VIEW_COMMAND("CREATE MATERIALIZED VIEW".getBytes()), CREATE_FUNCTION_COMMAND("CREATE FUNCTION".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java index d97fb34c48..5ef41ec304 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java @@ -27,8 +27,8 @@ public abstract class RisingwaveCommandTemplate { - private final CCJSqlParserManager parserManager = new CCJSqlParserManager(); - private final Map includeMap = new Object2ObjectHashMap<>(); + protected final CCJSqlParserManager parserManager = new CCJSqlParserManager(); + protected final Map includeMap = new Object2ObjectHashMap<>(); protected final StringBuilder includeBuilder = new StringBuilder(); protected static final Map ZILLA_MAPPINGS = new Object2ObjectHashMap<>(); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDropTopicTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDropTopicTemplate.java new file mode 100644 index 0000000000..a7b82bfdca --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveDropTopicTemplate.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; + +import java.io.StringReader; + +import org.agrona.DirectBuffer; + +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.statement.drop.Drop; + +public class RisingwaveDropTopicTemplate extends RisingwaveCommandTemplate +{ + private final String sqlFormat = """ + DROP TOPIC %s;\u0000"""; + + public RisingwaveDropTopicTemplate() + { + } + + public String generate( + Drop dropTable) + { + String topic = dropTable.getName().getName(); + return String.format(sqlFormat, topic); + } + + public Drop parserDropTable( + DirectBuffer buffer, + int offset, + int length) + { + String query = buffer.getStringWithoutLengthUtf8(offset, length); + + Drop dropTable = null; + + try + { + dropTable = (Drop) parserManager.parse(new StringReader(query)); + } + catch (JSQLParserException ignore) + { + } + + return dropTable; + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java index b80098e3e8..b0e52b80b4 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java @@ -18,6 +18,7 @@ public enum RisingwaveCompletionCommand { UNKNOWN_COMMAND("UNKNOWN".getBytes()), CREATE_TABLE_COMMAND("CREATE_TABLE".getBytes()), + DROP_TABLE_COMMAND("DROP_TABLE".getBytes()), CREATE_MATERIALIZED_VIEW_COMMAND("CREATE_MATERIALIZED_VIEW".getBytes()), CREATE_FUNCTION_COMMAND("CREATE_FUNCTION".getBytes()); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 418a848edd..8d980ec718 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -144,6 +144,7 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory Object2ObjectHashMap clientTransforms = new Object2ObjectHashMap<>(); clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::decodeCreateTableCommand); + clientTransforms.put(RisingwaveCommandType.DROP_TABLE_COMMAND, this::decodeDropTableCommand); clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::decodeCreateMaterializedViewCommand); clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::decodeCreateFunctionCommand); clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand); @@ -497,10 +498,10 @@ private void onAppWindow( } private void onCommandCompleted( - long traceId, - long authorization, - int progress, - RisingwaveCompletionCommand command) + long traceId, + long authorization, + int progress, + RisingwaveCompletionCommand command) { final MutableDirectBuffer parserBuffer = bufferPool.buffer(parserSlot); @@ -1504,6 +1505,47 @@ else if (server.commandsProcessed == 1) } } + private void decodeDropTableCommand( + PgsqlServer server, + long traceId, + long authorization, + DirectBuffer buffer, + int offset, + int length) + { + if (server.commandsProcessed == 2) + { + server.onCommandCompleted(traceId, authorization, length, RisingwaveCompletionCommand.DROP_TABLE_COMMAND); + } + else + { + final RisingwaveBindingConfig binding = server.binding; + final RisingwaveCreateTableCommand command = binding.deleteTopic.parserDropTable(buffer, offset, length); + + String newStatement = ""; + int progress = 0; + + if (server.commandsProcessed == 0) + { + newStatement = binding.deleteTopic.generate(command.createTable); + } + else if (server.commandsProcessed == 1) + { + newStatement = binding.createSource.generate(server.database, command); + } + + statementBuffer.putBytes(progress, newStatement.getBytes()); + progress += newStatement.length(); + + final RisingwaveRouteConfig route = + server.binding.resolve(authorization, statementBuffer, 0, progress); + + final PgsqlClient client = server.streamsByRouteIds.get(route.id); + client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress); + client.typeCommand = ignoreFlushCommand; + } + } + private void decodeCreateMaterializedViewCommand( PgsqlServer server, long traceId,