Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence Component: Split Size Change User Capability into Length and Scale #2943

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum SchemaEvolutionCapability
{
ADD_COLUMN,
DATA_TYPE_CONVERSION,
DATA_TYPE_SIZE_CHANGE,
DATA_TYPE_LENGTH_CHANGE,
DATA_TYPE_SCALE_CHANGE,
COLUMN_NULLABILITY_CHANGE
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private void evolveDataType(Field newField, Field mainDataField, Dataset mainDat
if (!Objects.equals(mainDataField.type().length(), newField.type().length()))
{
if (!sink.capabilities().contains(Capability.DATA_TYPE_LENGTH_CHANGE)
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE)))
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE)))
{
throw new IncompatibleSchemaChangeException(String.format("Data type length changes couldn't be performed on column \"%s\" since sink/user capability does not allow it", newField.name()));
}
Expand All @@ -240,7 +240,7 @@ private void evolveDataType(Field newField, Field mainDataField, Dataset mainDat
if (!Objects.equals(mainDataField.type().scale(), newField.type().scale()))
{
if (!sink.capabilities().contains(Capability.DATA_TYPE_SCALE_CHANGE)
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE)))
|| (!schemaEvolutionCapabilitySet.contains(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE)))
{
throw new IncompatibleSchemaChangeException(String.format("Data type scale changes couldn't be performed on column \"%s\" since sink/user capability does not allow it", newField.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,7 @@ void testSnapshotMilestoningWithAddColumnWithoutUserProvidedSchemaEvolutionCapab
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

// Use the planner utils to return the sql
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaEvolutionAddColumnWithUpperCase, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -218,7 +214,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -248,7 +244,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionWithUpperCase()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -281,12 +277,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionAndUserProvidedSchema
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaEvolutionModifySize, sqlsForSchemaEvolution.get(0));

Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -310,7 +301,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, false);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -344,8 +335,6 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolutionAndUserProvidedSchemaE
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
Expand Down Expand Up @@ -430,10 +419,7 @@ void testSnapshotMilestoningWithAlterNullabilityWithoutUserCapability()
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(2, sqlsForSchemaEvolution.size());
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down Expand Up @@ -521,11 +507,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndUserProvidedSchem
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.assertEquals(expectedSchemaNonBreakingChange, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down Expand Up @@ -556,12 +538,7 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChange()
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();

Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand All @@ -587,23 +564,16 @@ void testSnapshotMilestoningWithNonBreakingDataTypeEvolutionAndSizingChangeAllow
NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());

List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();

Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
}
catch (IncompatibleSchemaChangeException e)
{
Assertions.assertEquals("Data sizing changes couldn't be performed on column \"amount\" since user capability does not allow it", e.getMessage());
}
Assertions.assertEquals(expectedSchemaNonBreakingChangeWithSizing, sqlsForSchemaEvolution.get(0));
}

// Breaking data type change from DOUBLE --> VARCHAR. Throws exception
Expand Down Expand Up @@ -685,9 +655,7 @@ void testSnapshotMilestoningWithColumnMissingInStagingTableWithoutUserCapability
try
{
SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
RelationalTransformer transformer = new RelationalTransformer(relationalSink);
SqlPlan physicalPlanForSchemaEvolution = transformer.generatePhysicalPlan(result.logicalPlan());
List<String> sqlsForSchemaEvolution = physicalPlanForSchemaEvolution.getSqlList();
Assertions.fail("Exception was not thrown");
}
catch (IncompatibleSchemaChangeException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ void testDataTypeSizeChange() throws Exception
.build();

Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void testDataTypeSizeChange() throws Exception

PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).enableSchemaEvolution(true).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
Datasets datasets = Datasets.of(mainTable, stagingTable);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};
Expand Down Expand Up @@ -331,7 +331,8 @@ void testDataTypeConversionAndDataTypeSizeChange() throws Exception
PlannerOptions options = PlannerOptions.builder().cleanupStagingData(false).collectStatistics(true).enableSchemaEvolution(true).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_CONVERSION);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SCALE_CHANGE);
Datasets datasets = Datasets.of(mainTable, stagingTable);

String[] schema = new String[]{idName, nameName, incomeName, startTimeName, expiryDateName, digestName, batchUpdateTimeName, batchIdName};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -124,7 +124,7 @@ void testSnapshotMilestoningWithColumnLengthChangeEvolutionWithUpperCase()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down Expand Up @@ -184,7 +184,7 @@ void testSnapshotMilestoningWithColumnScaleChangeEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, false);

try
Expand Down Expand Up @@ -244,7 +244,7 @@ void testSnapshotMilestoningWithImplicitDataTypeEvolutionAndLengthEvolution()

NontemporalSnapshot ingestMode = NontemporalSnapshot.builder().auditing(NoAuditing.builder().build()).build();
Set<SchemaEvolutionCapability> schemaEvolutionCapabilitySet = new HashSet<>();
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_SIZE_CHANGE);
schemaEvolutionCapabilitySet.add(SchemaEvolutionCapability.DATA_TYPE_LENGTH_CHANGE);
SchemaEvolution schemaEvolution = new SchemaEvolution(relationalSink, ingestMode, schemaEvolutionCapabilitySet, true);

SchemaEvolutionResult result = schemaEvolution.buildLogicalPlanForSchemaEvolution(mainTable, stagingTable.schema());
Expand Down
Loading