Skip to content

Commit

Permalink
Cassandra Schema and Value Mapping (#2048)
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle authored Dec 14, 2024
1 parent 197765a commit cd28d01
Show file tree
Hide file tree
Showing 23 changed files with 1,298 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.Period;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand All @@ -43,6 +44,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -563,6 +565,48 @@ static void handleDatastreamRecordType(
.withZoneSameInstant(ZoneId.of("UTC"))
.format(DEFAULT_TIMESTAMP_WITH_TZ_FORMATTER));
break;
/*
* The `intervalNano` maps to nano second precision interval type used by Cassandra Interval.
* On spanner this will map to `string` or `Interval` type.
* This is added here for DQL retrials for sourcedb-to-spanner.
*
* TODO(b/383689307):
* There's a lot of commonality in handling avro types between {@link FormatDatastreamRecordToJson} and {@link com.google.cloud.teleport.v2.spanner.migrations.avro.GenericRecordTypeConvertor}.
* Adding inter-package dependency might not be the best route, and we might eventually want to build a common package for handling common logic between the two.
*/
case "intervalNano":
Period period =
Period.ZERO
.plusYears(getOrDefault(element, "years", 0L))
.plusMonths(getOrDefault(element, "months", 0L))
.plusDays(getOrDefault(element, "days", 0L));
/*
* Convert the period to a ISO-8601 period formatted String, such as P6Y3M1D.
* A zero period will be represented as zero days, 'P0D'.
* Refer to javadoc for Period#toString.
*/
String periodIso8061 = period.toString();
java.time.Duration duration =
java.time.Duration.ZERO
.plusHours(getOrDefault(element, "hours", 0L))
.plusMinutes(getOrDefault(element, "minutes", 0L))
.plusSeconds(getOrDefault(element, "seconds", 0L))
.plusNanos(getOrDefault(element, "nanos", 0L));
/*
* Convert the duration to a ISO-8601 period formatted String, such as PT8H6M12.345S
* refer to javadoc for Duration#toString.
*/
String durationIso8610 = duration.toString();
// Convert to ISO-8601 period format.
String convertedIntervalNano;
if (duration.isZero()) {
convertedIntervalNano = periodIso8061;
} else {
convertedIntervalNano =
periodIso8061 + StringUtils.removeStartIgnoreCase(durationIso8610, "P");
}
jsonObject.put(fieldName, convertedIntervalNano);
break;
default:
LOG.warn(
"Unknown field type {} for field {} in record {}.", fieldSchema, fieldName, element);
Expand All @@ -578,5 +622,12 @@ static void handleDatastreamRecordType(
break;
}
}

private static <T> T getOrDefault(GenericRecord element, String name, T def) {
if (element.get(name) == null) {
return def;
}
return (T) element.get(name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.teleport.v2.datastream.transforms.FormatDatastreamRecordToJson.UnifiedTypesFormatter;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
Expand Down Expand Up @@ -211,4 +215,128 @@ public void testLogicalType_micros() {
fieldNamePositiveNumber, fieldSchema, element, jsonObject);
assertTrue(jsonObject.get(fieldNamePositiveNumber).asText().equals("1981-11-21T11:45:11Z"));
}

@Test
public void testIntervalNano() throws JsonProcessingException {

ObjectNode objectNode = new ObjectNode(new JsonNodeFactory(true));

/* Basic Test. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"basic",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(1000L, 1000L, 3890L, 25L, 331L, 12L, 9L),
objectNode);

/* Test with any field set as null gets treated as 0. */

UnifiedTypesFormatter.handleDatastreamRecordType(
"null_minute",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(1000L, 1000L, 3890L, 25L, null, 12L, 9L),
objectNode);

/* Basic test for negative field. */

UnifiedTypesFormatter.handleDatastreamRecordType(
"neg_field_basic",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(1000L, -1000L, 3890L, 25L, 31L, 12L, 9L),
objectNode);

/* Test that negative nanos subtract from the fractional seconds, for example 12 Seconds -1 Nanos becomes 11.999999991s. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"neg_fractional_seconds",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(1000L, 31L, 3890L, 25L, 31L, 12L, -9L),
objectNode);

/* Test 0 interval. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"zero_interval",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 0L),
objectNode);

/* Test almost zero interval with only nanos set. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"one_nano_interval",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 1L),
objectNode);
/* Test with large values. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"large_values",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(
2147483647L, 11L, 2147483647L, 2147483647L, 2147483647L, 2147483647L, 999999999L),
objectNode);

/* Test with large negative values. */
UnifiedTypesFormatter.handleDatastreamRecordType(
"large_negative_values",
generateIntervalNanosSchema(),
generateIntervalNanosRecord(
-2147483647L,
-11L,
-2147483647L,
-2147483647L,
-2147483647L,
-2147483647L,
-999999999L),
objectNode);
String expected =
"{\"basic\":\"P1000Y1000M3890DT30H31M12.000000009S\","
+ "\"null_minute\":\"P1000Y1000M3890DT25H12.000000009S\","
+ "\"neg_field_basic\":\"P1000Y-1000M3890DT25H31M12.000000009S\","
+ "\"neg_fractional_seconds\":\"P1000Y31M3890DT25H31M11.999999991S\","
+ "\"zero_interval\":\"P0D\","
+ "\"one_nano_interval\":\"P0DT0.000000001S\","
+ "\"large_values\":\"P2147483647Y11M2147483647DT2183871564H21M7.999999999S\","
+ "\"large_negative_values\":\"P-2147483647Y-11M-2147483647DT-2183871564H-21M-7.999999999S\"}";
assertEquals(expected, new ObjectMapper().writeValueAsString(objectNode));
}

private GenericRecord generateIntervalNanosRecord(
Long years, Long months, Long days, Long hours, Long minutes, Long seconds, Long nanos) {

GenericRecord genericRecord = new GenericData.Record(generateIntervalNanosSchema());
genericRecord.put("years", years);
genericRecord.put("months", months);
genericRecord.put("days", days);
genericRecord.put("hours", hours);
genericRecord.put("minutes", minutes);
genericRecord.put("seconds", seconds);
genericRecord.put("nanos", nanos);
return genericRecord;
}

private Schema generateIntervalNanosSchema() {

return SchemaBuilder.builder()
.record("intervalNano")
.fields()
.name("years")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("months")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("days")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("hours")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("minutes")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("seconds")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.name("nanos")
.type(SchemaBuilder.builder().longType())
.withDefault(0L)
.endRecord();
}
}
10 changes: 10 additions & 0 deletions v2/sourcedb-to-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,15 @@
<version>5.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider;
import com.google.common.collect.ImmutableMap;

/** Represent Unified type mapping, value extractor and value mappings for Cassandra. */
@AutoValue
public abstract class CassandraMappings {
public abstract ImmutableMap<String, UnifiedTypeMapping> typeMapping();

public abstract ImmutableMap<String, CassandraFieldMapper<?>> fieldMapping();

public static Builder builder() {
return new AutoValue_CassandraMappings.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
abstract ImmutableMap.Builder<String, UnifiedTypeMapping> typeMappingBuilder();

abstract ImmutableMap.Builder<String, CassandraFieldMapper<?>> fieldMappingBuilder();

public <T> Builder put(
String cassandraType,
UnifiedMappingProvider.Type type,
CassandraRowValueExtractor<T> rowValueExtractor,
CassandraRowValueMapper<T> rowValueMapper) {
this.typeMappingBuilder()
.put(cassandraType.toUpperCase(), UnifiedMappingProvider.getMapping(type));
this.fieldMappingBuilder()
.put(
cassandraType.toUpperCase(),
CassandraFieldMapper.create(rowValueExtractor, rowValueMapper));
return this;
}

public abstract CassandraMappings build();
}
}
Loading

0 comments on commit cd28d01

Please sign in to comment.