Skip to content

Commit

Permalink
feat: Prevent column validation exceptions caused by Oracle CLOB JSON…
Browse files Browse the repository at this point in the history
… columns (#1365)

* tests: Add some failing tests so I can work on adding JSON support

* feat: JSON columns are compared using length in column validations like string columns

* chore: Typos

* Update data_validation/config_manager.py

Co-authored-by: Helen Cristina <[email protected]>

* Update data_validation/config_manager.py

Co-authored-by: Helen Cristina <[email protected]>

* Update data_validation/config_manager.py

Co-authored-by: Helen Cristina <[email protected]>

---------

Co-authored-by: Helen Cristina <[email protected]>
  • Loading branch information
nj1973 and helensilva14 authored Dec 12, 2024
1 parent 97cfdbd commit b20b4dd
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
45 changes: 34 additions & 11 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,32 @@ def build_and_append_pre_agg_calc_config(
return calculated_config

def append_pre_agg_calc_field(
self, source_column, target_column, agg_type, column_type, column_position
self,
source_column: str,
target_column: str,
agg_type: str,
column_type: str,
target_column_type: str,
column_position: int,
) -> dict:
"""Append calculated field for length(string | binary) or epoch_seconds(timestamp) for preprocessing before column validation aggregation."""
"""Append calculated field for length() or epoch_seconds(timestamp) for preprocessing before column validation aggregation."""
depth, cast_type = 0, None
if column_type in ["string", "!string"]:
if any(_ in ["json", "!json"] for _ in [column_type, target_column_type]):
# JSON data which needs casting to string before we apply a length function.
pre_calculated_config = self.build_and_append_pre_agg_calc_config(
source_column,
target_column,
"cast",
column_position,
"string",
depth,
)
source_column = target_column = pre_calculated_config[
consts.CONFIG_FIELD_ALIAS
]
depth = 1
calc_func = "length"
elif column_type in ["string", "!string"]:
calc_func = "length"

elif column_type in ["binary", "!binary"]:
Expand All @@ -763,14 +784,12 @@ def append_pre_agg_calc_field(
self.source_client.name == "bigquery"
or self.target_client.name == "bigquery"
):
calc_func = "cast"
cast_type = "timestamp"
pre_calculated_config = self.build_and_append_pre_agg_calc_config(
source_column,
target_column,
calc_func,
"cast",
column_position,
cast_type,
"timestamp",
depth,
)
source_column = target_column = pre_calculated_config[
Expand Down Expand Up @@ -852,14 +871,17 @@ def require_pre_agg_calc_field(
agg_type: str,
cast_to_bigint: bool,
) -> bool:
if column_type in ["string", "!string"] and target_column_type in [
"string",
"!string",
]:
if all(
_ in ["string", "!string", "json", "!json"]
for _ in [column_type, target_column_type]
):
# These data types are aggregated using their lengths.
return True
elif column_type in ["binary", "!binary"]:
if agg_type == "count":
# Oracle BLOB is invalid for use with SQL COUNT function.
# The expression below returns True if client is Oracle which
# has the effect of triggering use of byte_length transformation.
return bool(
self.source_client.name == "oracle"
or self.target_client.name == "oracle"
Expand Down Expand Up @@ -955,6 +977,7 @@ def require_pre_agg_calc_field(
casefold_target_columns[column],
agg_type,
column_type,
target_column_type,
column_position,
)
else:
Expand Down
9 changes: 9 additions & 0 deletions tests/resources/oracle_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob BLOB
, col_clob CLOB
, col_nclob NCLOB
, col_json CLOB
, col_jsonb CLOB
);
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk1 CHECK (col_json IS JSON) ENABLE;
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk2 CHECK (col_jsonb IS JSON) ENABLE;
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

-- Literals below match corresponding table in postgresql_test_tables.sql
Expand All @@ -121,6 +127,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT')
,UTL_RAW.CAST_TO_RAW('DVT'),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(2,2222,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -134,6 +141,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT'),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(3,3333,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -147,6 +155,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT DVT'),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);
COMMIT;

Expand Down
9 changes: 7 additions & 2 deletions tests/resources/postgresql_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob bytea
, col_clob text
, col_nclob text
, col_json json
, col_jsonb jsonb
);
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

Expand All @@ -94,7 +96,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-01 00:00:01.123456 +00:00'
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT' AS BYTEA)
,CAST('DVT' AS BYTEA),'DVT A','DVT A')
,CAST('DVT' AS BYTEA),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}')
,(2,2222,123456789,123456789012345678,1234567890123456789012345
,123.12,123.11
--,123400,0.002
Expand All @@ -105,7 +108,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-02 00:00:02.123456 -02:00'
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT' AS BYTEA)
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B')
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}')
,(3,3333,123456789,123456789012345678,1234567890123456789012345
,123.123,123.11
--,123400,0.003
Expand All @@ -117,6 +121,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT DVT' AS BYTEA)
,CAST('DVT DVT DVT' AS BYTEA),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);

/* Following table used for validating generating table partitions */
Expand Down
10 changes: 8 additions & 2 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
"col_blob",
"col_clob",
"col_nclob",
"col_json",
"col_jsonb",
]


Expand Down Expand Up @@ -374,8 +376,8 @@ def test_row_validation_oracle_to_postgres():
# TODO col_raw/col_long_raw are blocked by issue-773 (is it even reasonable to expect binary columns to work here?)
# TODO Change hash_cols below to include col_nvarchar_30,col_nchar_2 when issue-772 is complete.
# TODO Change hash_cols below to include col_interval_ds when issue-1214 is complete.
# TODO Change hash_cols below to include col_clob/col_nclob/col_blob/col_json/col_jsonb when issue-1364 is complete.
# Excluded col_float32,col_float64 due to the lossy nature of BINARY_FLOAT/DOUBLE.
# Excluded CLOB/NCLOB/BLOB columns because lob values cannot be concatenated
hash_cols = ",".join(
[
_
Expand All @@ -393,6 +395,8 @@ def test_row_validation_oracle_to_postgres():
"col_nvarchar_30",
"col_nchar_2",
"col_interval_ds",
"col_json",
"col_jsonb",
)
]
)
Expand Down Expand Up @@ -592,8 +596,8 @@ def test_custom_query_row_validation_oracle_to_postgres():
# TODO col_raw/col_long_raw are blocked by issue-773 (is it even reasonable to expect binary columns to work here?)
# TODO Change hash_cols below to include col_nvarchar_30,col_nchar_2 when issue-772 is complete.
# TODO Change hash_cols below to include col_interval_ds when issue-1214 is complete.
# TODO Change hash_cols below to include col_clob/col_nclob/col_blob/col_json/col_jsonb when issue-1364 is complete.
# Excluded col_float32,col_float64 due to the lossy nature of BINARY_FLOAT/DOUBLE.
# Excluded CLOB/NCLOB/BLOB columns because lob values cannot be concatenated
hash_cols = ",".join(
[
_
Expand All @@ -611,6 +615,8 @@ def test_custom_query_row_validation_oracle_to_postgres():
"col_nvarchar_30",
"col_nchar_2",
"col_interval_ds",
"col_json",
"col_jsonb",
)
]
)
Expand Down

0 comments on commit b20b4dd

Please sign in to comment.