Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek committed Sep 24, 2024
1 parent c7e6e62 commit 8668073
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTableTemplate;
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateTopicTemplate;
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDescribeTemplate;
import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveDropTopicTemplate;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
Expand All @@ -44,6 +45,7 @@ public final class RisingwaveBindingConfig
public final KindConfig kind;
public final List<RisingwaveRouteConfig> routes;
public final RisingwaveCreateTopicTemplate createTopic;
public final RisingwaveDropTopicTemplate deleteTopic;
public final RisingwaveCreateMaterializedViewTemplate createView;
public final RisingwaveDescribeTemplate describeView;
public final RisingwaveCreateTableTemplate createTable;
Expand Down Expand Up @@ -87,6 +89,7 @@ public RisingwaveBindingConfig(
location, config.kafkaScanStartupTimestampMillis());
this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location);
this.createTopic = new RisingwaveCreateTopicTemplate();
this.deleteTopic = new RisingwaveDropTopicTemplate();
this.createView = new RisingwaveCreateMaterializedViewTemplate();
this.describeView = new RisingwaveDescribeTemplate();
this.createFunction = new RisingwaveCreateFunctionTemplate(udf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public enum RisingwaveCommandType
{
CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()),
CREATE_TABLE_COMMAND("CREATE TABLE".getBytes()),
DROP_TABLE_COMMAND("DROP TABLE".getBytes()),
CREATE_MATERIALIZED_VIEW_COMMAND("CREATE MATERIALIZED VIEW".getBytes()),
CREATE_FUNCTION_COMMAND("CREATE FUNCTION".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

public abstract class RisingwaveCommandTemplate
{
private final CCJSqlParserManager parserManager = new CCJSqlParserManager();
private final Map<String, String> includeMap = new Object2ObjectHashMap<>();
protected final CCJSqlParserManager parserManager = new CCJSqlParserManager();
protected final Map<String, String> includeMap = new Object2ObjectHashMap<>();

protected final StringBuilder includeBuilder = new StringBuilder();
protected static final Map<String, String> ZILLA_MAPPINGS = new Object2ObjectHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.risingwave.internal.statement;

import java.io.StringReader;

import org.agrona.DirectBuffer;

import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.statement.drop.Drop;

public class RisingwaveDropTopicTemplate extends RisingwaveCommandTemplate
{
private final String sqlFormat = """
DROP TOPIC %s;\u0000""";

public RisingwaveDropTopicTemplate()
{
}

public String generate(
Drop dropTable)
{
String topic = dropTable.getName().getName();
return String.format(sqlFormat, topic);
}

public Drop parserDropTable(
DirectBuffer buffer,
int offset,
int length)
{
String query = buffer.getStringWithoutLengthUtf8(offset, length);

Drop dropTable = null;

try
{
dropTable = (Drop) parserManager.parse(new StringReader(query));
}
catch (JSQLParserException ignore)
{
}

return dropTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum RisingwaveCompletionCommand
{
UNKNOWN_COMMAND("UNKNOWN".getBytes()),
CREATE_TABLE_COMMAND("CREATE_TABLE".getBytes()),
DROP_TABLE_COMMAND("DROP_TABLE".getBytes()),
CREATE_MATERIALIZED_VIEW_COMMAND("CREATE_MATERIALIZED_VIEW".getBytes()),
CREATE_FUNCTION_COMMAND("CREATE_FUNCTION".getBytes());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory
Object2ObjectHashMap<RisingwaveCommandType, PgsqlTransform> clientTransforms =
new Object2ObjectHashMap<>();
clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::decodeCreateTableCommand);
clientTransforms.put(RisingwaveCommandType.DROP_TABLE_COMMAND, this::decodeDropTableCommand);
clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::decodeCreateMaterializedViewCommand);
clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::decodeCreateFunctionCommand);
clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
Expand Down Expand Up @@ -497,10 +498,10 @@ private void onAppWindow(
}

private void onCommandCompleted(
long traceId,
long authorization,
int progress,
RisingwaveCompletionCommand command)
long traceId,
long authorization,
int progress,
RisingwaveCompletionCommand command)
{
final MutableDirectBuffer parserBuffer = bufferPool.buffer(parserSlot);

Expand Down Expand Up @@ -1504,6 +1505,47 @@ else if (server.commandsProcessed == 1)
}
}

private void decodeDropTableCommand(
PgsqlServer server,
long traceId,
long authorization,
DirectBuffer buffer,
int offset,
int length)
{
if (server.commandsProcessed == 2)
{
server.onCommandCompleted(traceId, authorization, length, RisingwaveCompletionCommand.DROP_TABLE_COMMAND);
}
else
{
final RisingwaveBindingConfig binding = server.binding;
final RisingwaveCreateTableCommand command = binding.deleteTopic.parserDropTable(buffer, offset, length);

String newStatement = "";
int progress = 0;

if (server.commandsProcessed == 0)
{
newStatement = binding.deleteTopic.generate(command.createTable);
}
else if (server.commandsProcessed == 1)
{
newStatement = binding.createSource.generate(server.database, command);
}

statementBuffer.putBytes(progress, newStatement.getBytes());
progress += newStatement.length();

final RisingwaveRouteConfig route =
server.binding.resolve(authorization, statementBuffer, 0, progress);

final PgsqlClient client = server.streamsByRouteIds.get(route.id);
client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress);
client.typeCommand = ignoreFlushCommand;
}
}

private void decodeCreateMaterializedViewCommand(
PgsqlServer server,
long traceId,
Expand Down

0 comments on commit 8668073

Please sign in to comment.