Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prevent column validation exceptions caused by Oracle CLOB JSON columns #1365

Merged
merged 8 commits into from
Dec 12, 2024
45 changes: 34 additions & 11 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,32 @@ def build_and_append_pre_agg_calc_config(
return calculated_config

def append_pre_agg_calc_field(
self, source_column, target_column, agg_type, column_type, column_position
self,
source_column: str,
target_column: str,
agg_type: str,
column_type: str,
target_column_type: str,
column_position: int,
) -> dict:
"""Append calculated field for length(string | binary) or epoch_seconds(timestamp) for preprocessing before column validation aggregation."""
"""Append calculated field for length() or epoch_seconds(timestamp) for preprocessing before column validation aggregation."""
depth, cast_type = 0, None
if column_type in ["string", "!string"]:
if any(_ in ["json", "!json"] for _ in [column_type, target_column_type]):
# JSON data which needs casting to string before we apply a length function.
nj1973 marked this conversation as resolved.
Show resolved Hide resolved
pre_calculated_config = self.build_and_append_pre_agg_calc_config(
source_column,
target_column,
"cast",
column_position,
"string",
depth,
)
source_column = target_column = pre_calculated_config[
consts.CONFIG_FIELD_ALIAS
]
depth = 1
calc_func = "length"
elif column_type in ["string", "!string"]:
calc_func = "length"

elif column_type in ["binary", "!binary"]:
Expand All @@ -763,14 +784,12 @@ def append_pre_agg_calc_field(
self.source_client.name == "bigquery"
or self.target_client.name == "bigquery"
):
calc_func = "cast"
cast_type = "timestamp"
pre_calculated_config = self.build_and_append_pre_agg_calc_config(
source_column,
target_column,
calc_func,
"cast",
column_position,
cast_type,
"timestamp",
depth,
)
source_column = target_column = pre_calculated_config[
helensilva14 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -852,14 +871,17 @@ def require_pre_agg_calc_field(
agg_type: str,
cast_to_bigint: bool,
) -> bool:
if column_type in ["string", "!string"] and target_column_type in [
"string",
"!string",
]:
if all(
_ in ["string", "!string", "json", "!json"]
for _ in [column_type, target_column_type]
):
# These data types are aggregated using their lengths.
return True
elif column_type in ["binary", "!binary"]:
if agg_type == "count":
# Oracle BLOB is invalid for use with SQL COUNT function.
# The expression below returns True if client is Oracle which
# has the effect of triggering use of byte_length transformation.
return bool(
self.source_client.name == "oracle"
or self.target_client.name == "oracle"
Expand Down Expand Up @@ -955,6 +977,7 @@ def require_pre_agg_calc_field(
casefold_target_columns[column],
agg_type,
column_type,
target_column_type,
column_position,
)
else:
Expand Down
9 changes: 9 additions & 0 deletions tests/resources/oracle_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob BLOB
, col_clob CLOB
, col_nclob NCLOB
, col_json CLOB
, col_jsonb CLOB
);
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk1 CHECK (col_json IS JSON) ENABLE;
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk2 CHECK (col_jsonb IS JSON) ENABLE;
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

-- Literals below match corresponding table in postgresql_test_tables.sql
Expand All @@ -121,6 +127,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT')
,UTL_RAW.CAST_TO_RAW('DVT'),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(2,2222,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -134,6 +141,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT'),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(3,3333,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -147,6 +155,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT DVT'),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);
COMMIT;

Expand Down
9 changes: 7 additions & 2 deletions tests/resources/postgresql_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob bytea
, col_clob text
, col_nclob text
, col_json json
, col_jsonb jsonb
);
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

Expand All @@ -94,7 +96,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-01 00:00:01.123456 +00:00'
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT' AS BYTEA)
,CAST('DVT' AS BYTEA),'DVT A','DVT A')
,CAST('DVT' AS BYTEA),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}')
,(2,2222,123456789,123456789012345678,1234567890123456789012345
,123.12,123.11
--,123400,0.002
Expand All @@ -105,7 +108,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-02 00:00:02.123456 -02:00'
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT' AS BYTEA)
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B')
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}')
,(3,3333,123456789,123456789012345678,1234567890123456789012345
,123.123,123.11
--,123400,0.003
Expand All @@ -117,6 +121,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT DVT' AS BYTEA)
,CAST('DVT DVT DVT' AS BYTEA),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);

/* Following table used for validating generating table partitions */
Expand Down
10 changes: 8 additions & 2 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
"col_blob",
"col_clob",
"col_nclob",
"col_json",
"col_jsonb",
]


Expand Down Expand Up @@ -374,8 +376,8 @@ def test_row_validation_oracle_to_postgres():
# TODO col_raw/col_long_raw are blocked by issue-773 (is it even reasonable to expect binary columns to work here?)
# TODO Change hash_cols below to include col_nvarchar_30,col_nchar_2 when issue-772 is complete.
# TODO Change hash_cols below to include col_interval_ds when issue-1214 is complete.
# TODO Change hash_cols below to include col_clob/col_nclob/col_blob/col_json/col_jsonb when issue-1364 is complete.
# Excluded col_float32,col_float64 due to the lossy nature of BINARY_FLOAT/DOUBLE.
# Excluded CLOB/NCLOB/BLOB columns because lob values cannot be concatenated
hash_cols = ",".join(
[
_
Expand All @@ -393,6 +395,8 @@ def test_row_validation_oracle_to_postgres():
"col_nvarchar_30",
"col_nchar_2",
"col_interval_ds",
"col_json",
"col_jsonb",
)
]
)
Expand Down Expand Up @@ -592,8 +596,8 @@ def test_custom_query_row_validation_oracle_to_postgres():
# TODO col_raw/col_long_raw are blocked by issue-773 (is it even reasonable to expect binary columns to work here?)
# TODO Change hash_cols below to include col_nvarchar_30,col_nchar_2 when issue-772 is complete.
# TODO Change hash_cols below to include col_interval_ds when issue-1214 is complete.
# TODO Change hash_cols below to include col_clob/col_nclob/col_blob/col_json/col_jsonb when issue-1364 is complete.
# Excluded col_float32,col_float64 due to the lossy nature of BINARY_FLOAT/DOUBLE.
# Excluded CLOB/NCLOB/BLOB columns because lob values cannot be concatenated
hash_cols = ",".join(
[
_
Expand All @@ -611,6 +615,8 @@ def test_custom_query_row_validation_oracle_to_postgres():
"col_nvarchar_30",
"col_nchar_2",
"col_interval_ds",
"col_json",
"col_jsonb",
)
]
)
Expand Down
Loading