Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] - Integration of Cassandra DAO and DML Generator #2067

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class Constants {

/* The source type value for MySql databases */
public static final String MYSQL_SOURCE_TYPE = "mysql";
public static final String CASSANDRA_SOURCE_TYPE = "cassandra";

/* The value for Oracle databases in the source type key */
public static final String ORACLE_SOURCE_TYPE = "oracle";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public class Schema implements Serializable {
/** Maps the HarbourBridge table ID to the Spanner table details. */
private final Map<String, SpannerTable> spSchema;
private Map<String, SpannerTable> spSchema;

/** Maps the Spanner table ID to the synthetic PK. */
private final Map<String, SyntheticPKey> syntheticPKeys;
Expand Down Expand Up @@ -80,6 +80,10 @@ public Map<String, SpannerTable> getSpSchema() {
return spSchema;
}

public Map<String, SpannerTable> setSpSchema(Map<String, SpannerTable> spSchema) {
return this.spSchema = spSchema;
}

public Map<String, SyntheticPKey> getSyntheticPks() {
return syntheticPKeys;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.shard;

import java.util.Objects;

public class CassandraShard extends Shard {
private String keyspace;
private String consistencyLevel = "LOCAL_QUORUM";
private boolean sslOptions = false;
private String protocolVersion = "v5";
private String dataCenter = "datacenter1";
private int localPoolSize = 1024;
private int remotePoolSize = 256;

public String getKeySpaceName() {
return keyspace;
}

public String getConsistencyLevel() {
return consistencyLevel;
}

public Boolean getSSLOptions() {
return sslOptions;
}

public String getProtocolVersion() {
return protocolVersion;
}

public String getDataCenter() {
return dataCenter;
}

public Integer getLocalPoolSize() {
return localPoolSize;
}

public Integer getRemotePoolSize() {
return remotePoolSize;
}

public void setKeySpaceName(String keySpaceName) {
this.keyspace = keySpaceName;
}

public void setConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
}

public void setSslOptions(boolean sslOptions) {
this.sslOptions = sslOptions;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public void setDataCenter(String dataCenter) {
this.dataCenter = dataCenter;
}

public void setLocalPoolSize(int localPoolSize) {
this.localPoolSize = localPoolSize;
}

public void setRemotePoolSize(int remotePoolSize) {
this.remotePoolSize = remotePoolSize;
}

public void validate() throws IllegalArgumentException {
if (getHost() == null || getHost().isEmpty()) {
throw new IllegalArgumentException("Host is required");
}
if (getPort() == null || getPort().isEmpty()) {
throw new IllegalArgumentException("Port is required");
}
if (getUserName() == null || getUserName().isEmpty()) {
throw new IllegalArgumentException("Username is required");
}
if (getPassword() == null || getUserName().isEmpty()) {
throw new IllegalArgumentException("Password is required");
}
if (keyspace == null || keyspace.isEmpty()) {
throw new IllegalArgumentException("Keyspace is required");
}
}

@Override
public String toString() {
return "CassandraShard{"
+ "logicalShardId='"
+ getLogicalShardId()
+ '\''
+ ", host='"
+ getHost()
+ '\''
+ ", port='"
+ getPort()
+ '\''
+ ", user='"
+ getUserName()
+ '\''
+ ", keySpaceName='"
+ keyspace
+ '\''
+ ", datacenter='"
+ dataCenter
+ '\''
+ ", consistencyLevel='"
+ consistencyLevel
+ '\''
+ ", protocolVersion="
+ protocolVersion
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CassandraShard)) {
return false;
}
CassandraShard cassandraShard = (CassandraShard) o;
return Objects.equals(getLogicalShardId(), cassandraShard.getLogicalShardId())
&& Objects.equals(getHost(), cassandraShard.getHost())
&& Objects.equals(getPort(), cassandraShard.getPort())
&& Objects.equals(getUserName(), cassandraShard.getUserName())
&& Objects.equals(getPassword(), cassandraShard.getPassword())
&& Objects.equals(keyspace, cassandraShard.keyspace)
&& Objects.equals(dataCenter, cassandraShard.dataCenter)
&& Objects.equals(consistencyLevel, cassandraShard.consistencyLevel)
&& Objects.equals(protocolVersion, cassandraShard.protocolVersion)
&& Objects.equals(sslOptions, cassandraShard.sslOptions)
&& Objects.equals(localPoolSize, cassandraShard.localPoolSize)
&& Objects.equals(remotePoolSize, cassandraShard.remotePoolSize);
}

@Override
public int hashCode() {
return Objects.hash(
getLogicalShardId(),
getHost(),
getPort(),
getUserName(),
getPassword(),
keyspace,
dataCenter,
consistencyLevel,
protocolVersion,
sslOptions,
localPoolSize,
remotePoolSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.teleport.v2.spanner.ddl.Column;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.InformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ColumnPK;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnDefinition;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerColumnType;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;

Expand All @@ -43,4 +54,65 @@ public static Ddl getInformationSchemaAsDdl(SpannerConfig spannerConfig) {
spannerAccessor.close();
return ddl;
}

public static Map<String, SpannerTable> convertDDLTableToSpannerTable(Collection<Table> tables) {
return tables.stream()
.collect(
Collectors.toMap(
Table::name, // Use the table name as the key
SpannerSchema::convertTableToSpannerTable // Convert Table to SpannerTable
));
}

public static Map<String, NameAndCols> convertDDLTableToSpannerNameAndColsTable(
Collection<Table> tables) {
return tables.stream()
.collect(
Collectors.toMap(
Table::name, // Use the table name as the key
SpannerSchema
::convertTableToSpannerTableNameAndCols // Convert Table to SpannerTable
));
}

private static NameAndCols convertTableToSpannerTableNameAndCols(Table table) {
return new NameAndCols(
table.name(),
table.columns().stream()
.collect(
Collectors.toMap(
Column::name, // Use column IDs as keys
Column::name)));
}

private static SpannerTable convertTableToSpannerTable(Table table) {
String name = table.name(); // Table name
// Extract column IDs
String[] colIds =
table.columns().stream()
.map(Column::name) // Assuming Column name as ID
.toArray(String[]::new);

// Build column definitions
Map<String, SpannerColumnDefinition> colDefs =
table.columns().stream()
.collect(
Collectors.toMap(
Column::name, // Use column IDs as keys
column ->
new SpannerColumnDefinition(
column.name(),
new SpannerColumnType(
column.typeString(), // Type Code name (e.g., STRING, INT64)
false))));

// Extract primary keys
AtomicInteger orderCounter = new AtomicInteger(1);
ColumnPK[] primaryKeys =
table.primaryKeys().stream()
.map(pk -> new ColumnPK(pk.name(), orderCounter.getAndIncrement()))
.toArray(ColumnPK[]::new);

return new SpannerTable(name, colIds, colDefs, primaryKeys, table.name());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to read the Cassandra configuration file in GCS and convert it into a CassandraConfig
* object.
*/
public class CassandraConfigFileReader {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConfigFileReader.class);

public List<Shard> getCassandraShard(String cassandraConfigFilePath) {
try (InputStream stream =
Channels.newInputStream(
FileSystems.open(FileSystems.matchNewResource(cassandraConfigFilePath, false)))) {

String result = IOUtils.toString(stream, StandardCharsets.UTF_8);
Shard iShard =
new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.IDENTITY)
.create()
.fromJson(result, CassandraShard.class);

LOG.info("The Cassandra config is: {}", iShard);
return Collections.singletonList(iShard);

} catch (IOException e) {
LOG.error(
"Failed to read Cassandra config file. Make sure it is ASCII or UTF-8 encoded and contains a well-formed JSON string.",
e);
throw new RuntimeException(
"Failed to read Cassandra config file. Make sure it is ASCII or UTF-8 encoded and contains a well-formed JSON string.",
e);
}
}
}
5 changes: 5 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
</dependency>
<!-- TODO - Remove when https://github.com/apache/beam/pull/29732 is released. -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
Expand Down
Loading