Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent Oracle LOB columns from causing exceptions in Column and Row validation #1339

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eddd32d
tests: Add some failing tests so I can work on adding JSON support
nj1973 Nov 19, 2024
8027818
Merge remote-tracking branch 'origin/develop' into 1335-postgresql-js…
nj1973 Nov 21, 2024
ba32ab7
fix: Prevent exceptions caused by COUNT(Oracle LOB)
nj1973 Nov 21, 2024
7772e12
fix: Prevent Oracle LOB columns from causing exceptions in Column and…
nj1973 Nov 21, 2024
267981d
fix: Prevent Oracle LOB columns from causing exceptions in Column and…
nj1973 Nov 21, 2024
ec0c5fe
fix: Prevent Oracle LOB columns from causing exceptions in Column and…
nj1973 Nov 21, 2024
936865b
fix: Prevent Oracle LOB columns from causing exceptions in Column and…
nj1973 Nov 22, 2024
816f6ff
Merge branch 'develop' into 1335-postgresql-jsonb-data-type-support
nj1973 Nov 25, 2024
2a4fa01
Update data_validation/config_manager.py
nj1973 Nov 25, 2024
83d4b6a
Update data_validation/config_manager.py
nj1973 Nov 25, 2024
38722f7
tests: Don't use Python 3.11 for Oracle integration tests (#1346)
nj1973 Nov 25, 2024
78e1a54
tests: Add dvt_binary column validation tests
nj1973 Nov 26, 2024
01b6889
chore: PR comment
nj1973 Nov 26, 2024
3b76106
chore: PR comment
nj1973 Nov 26, 2024
ff05c37
tests: Skip DB2 dvt_binary
nj1973 Nov 26, 2024
be7f99d
chore: PR comment
nj1973 Nov 26, 2024
7afff0f
feat: Support customer defined api endpoint for BigQuery and Spanner …
nj1973 Nov 26, 2024
3225249
chore: Create SECURITY.md (#1352)
helensilva14 Nov 26, 2024
c1b18dc
Update data_validation/config_manager.py
nj1973 Nov 26, 2024
f4f537d
chore: Create CODE_OF_CONDUCT.md (#1351)
helensilva14 Nov 26, 2024
973ad63
chore: Add sample files using DVT for object reconcilliation (#1359)
nj1973 Dec 2, 2024
b38dc87
docs: Refine connection secret support section (#1358)
nj1973 Dec 2, 2024
26e0e4e
Merge branch 'develop' into 1335-postgresql-jsonb-data-type-support
helensilva14 Dec 2, 2024
b3b5636
Merge branch 'develop' into 1335-postgresql-jsonb-data-type-support
nj1973 Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 85 additions & 9 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def __init__(self, config, source_client=None, target_client=None, verbose=False
if self.validation_type not in consts.CONFIG_TYPES:
raise ValueError(f"Unknown Configuration Type: {self.validation_type}")
self._comparison_max_col_length = None
# For some engines we need to know the actual raw data type rather than the Ibis canonical type.
self._source_raw_data_types = None
self._target_raw_data_types = None

@property
def config(self):
Expand Down Expand Up @@ -842,7 +845,12 @@ def _key_column_needs_casting_to_string(
)

def build_config_column_aggregates(
self, agg_type, arg_value, exclude_cols, supported_types, cast_to_bigint=False
self,
agg_type: str,
arg_value: list,
exclude_cols: bool,
supported_types: list,
cast_to_bigint=False,
):
"""Return list of aggregate objects of given agg_type."""

Expand All @@ -858,13 +866,7 @@ def require_pre_agg_calc_field(
]:
return True
elif column_type in ["binary", "!binary"]:
if agg_type == "count":
# Oracle BLOB is invalid for use with SQL COUNT function.
return bool(
self.source_client.name == "oracle"
or self.target_client.name == "oracle"
)
else:
helensilva14 marked this conversation as resolved.
Show resolved Hide resolved
if agg_type != "count":
# Convert to length for any min/max/sum on binary columns.
return True
elif cast_to_bigint and column_type in ["int32", "!int32"]:
Expand Down Expand Up @@ -946,6 +948,16 @@ def require_pre_agg_calc_field(
f"Skipping {agg_type} on {column} due to data type: {column_type}"
)
continue
# Oracle LOB columns are invalid for use with SQL COUNT function.
elif agg_type == "count" and self._is_oracle_lob(column):
nj1973 marked this conversation as resolved.
Show resolved Hide resolved
# TODO Eventually we need to allow COUNT(Oracle LOB) by using a CASE expression:
# COUNT(CASE WHEN clob_col IS NULL THEN NULL ELSE 1 END)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip this column when you can change the query builder here , using the ibis.cases method. Isn't that a simpler and better solution ?

Thanks.

Sundar Mudupalli

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can skip it in the query builder because I don't have data type information for both the source and target. I believe ConfigManager is where I have all of the information required to exclude a column from both source and target queries.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Based on your discussion during the sync call, is this comment going to be resolved as-is or there will be code changes related to it?

# For now we skip them.
if self.verbose:
logging.info(
f"Skipping {agg_type} on {column} due to Oracle LOB data type"
)
continue

if require_pre_agg_calc_field(
column_type, target_column_type, agg_type, cast_to_bigint
Expand Down Expand Up @@ -1010,6 +1022,69 @@ def _get_comparison_max_col_length(self) -> int:
)
return self._comparison_max_col_length

def _get_source_raw_data_types(self) -> dict:
"""Return a dict of column name: raw data type.

The raw data type is the source/target engine type, for example it might
be "NCLOB" when the Ibis type states "string"."""
if self._source_raw_data_types is None:
if clients.is_oracle_client(self.source_client):
raw_data_types = self.source_client.raw_column_metadata(
database=self.source_schema,
table=self.source_table,
query=self.source_query,
)
self._source_raw_data_types = {
_.casefold(): raw_data_types[_] for _ in raw_data_types
}
else:
self._source_raw_data_types = {}
return self._source_raw_data_types

def _get_target_raw_data_types(self) -> dict:
"""Return a dict of column name: raw data type.

The raw data type is the source/target engine type, for example it might
be "NCLOB" when the Ibis type states "string"."""
if self._target_raw_data_types is None:
if clients.is_oracle_client(self.target_client):
raw_data_types = self.target_client.raw_column_metadata(
database=self.target_schema,
table=self.target_table,
query=self.target_query,
)
self._target_raw_data_types = {
_.casefold(): raw_data_types[_] for _ in raw_data_types
}
else:
self._target_raw_data_types = {}
return self._target_raw_data_types

def _is_oracle_lob(self, casefold_column_name: str) -> bool:
"""Checks if a column in either the source or target database is an Oracle LOB data type.

Args:
casefold_column_name: The case-insensitive name of the column to check.
"""
return bool(
nj1973 marked this conversation as resolved.
Show resolved Hide resolved
self._get_source_raw_data_types()
.get(casefold_column_name, "")
.endswith("LOB")
or self._get_target_raw_data_types()
.get(casefold_column_name, "")
.endswith("LOB")
)

def _exclude_oracle_lob_columns(self, casefold_column_names: list):
"""Remove LOB columns from validation to avoid ORA-00932 errors."""
oracle_lob_cols = [_ for _ in casefold_column_names if self._is_oracle_lob(_)]
if oracle_lob_cols:
if self.verbose:
logging.info(f"Skipping Oracle LOB columns: {str(oracle_lob_cols)}")
return [_ for _ in casefold_column_names if _ not in oracle_lob_cols]
else:
return casefold_column_names

def _strftime_format(
self, column_type: Union[dt.Date, dt.Timestamp], client
) -> str:
Expand Down Expand Up @@ -1124,7 +1199,8 @@ def build_dependent_aliases(
col_names = []
for i, calc in enumerate(self._get_order_of_operations(calc_type)):
if i == 0:
previous_level = [x for x in casefold_source_columns.keys()]
previous_level = [_ for _ in casefold_source_columns.keys()]
previous_level = self._exclude_oracle_lob_columns(previous_level)
else:
previous_level = [k for k, v in column_aliases.items() if v == i - 1]
if calc in ["concat", "hash"]:
Expand Down
9 changes: 9 additions & 0 deletions tests/resources/oracle_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob BLOB
, col_clob CLOB
, col_nclob NCLOB
, col_json CLOB
, col_jsonb CLOB
);
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk1 CHECK (col_json IS JSON) ENABLE;
ALTER TABLE pso_data_validator.dvt_ora2pg_types
ADD CONSTRAINT dvt_ora2pg_types_chk2 CHECK (col_jsonb IS JSON) ENABLE;
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

-- Literals below match corresponding table in postgresql_test_tables.sql
Expand All @@ -121,6 +127,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT')
,UTL_RAW.CAST_TO_RAW('DVT'),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(2,2222,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -134,6 +141,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT'),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}'
);
INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
(3,3333,123456789,123456789012345678,1234567890123456789012345
Expand All @@ -147,6 +155,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,UTL_RAW.CAST_TO_RAW('DVT'),UTL_RAW.CAST_TO_RAW('DVT DVT DVT')
,UTL_RAW.CAST_TO_RAW('DVT DVT DVT'),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);
COMMIT;

Expand Down
9 changes: 7 additions & 2 deletions tests/resources/postgresql_test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ CREATE TABLE pso_data_validator.dvt_ora2pg_types
, col_blob bytea
, col_clob text
, col_nclob text
, col_json json
, col_jsonb jsonb
);
COMMENT ON TABLE pso_data_validator.dvt_ora2pg_types IS 'Oracle to PostgreSQL integration test table';

Expand All @@ -94,7 +96,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-01 00:00:01.123456 +00:00'
,INTERVAL '1 2:03:44.0' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT' AS BYTEA)
,CAST('DVT' AS BYTEA),'DVT A','DVT A')
,CAST('DVT' AS BYTEA),'DVT A','DVT A'
,'{"dvt": 123, "status": "abc"}','{"dvt": 123, "status": "abc"}')
,(2,2222,123456789,123456789012345678,1234567890123456789012345
,123.12,123.11
--,123400,0.002
Expand All @@ -105,7 +108,8 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,TIMESTAMP WITH TIME ZONE'1970-01-02 00:00:02.123456 -02:00'
,INTERVAL '2 3:04:55.666' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT' AS BYTEA)
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B')
,CAST('DVT DVT' AS BYTEA),'DVT B','DVT B'
,'{"dvt": 234, "status": "def"}','{"dvt": 234, "status": "def"}')
,(3,3333,123456789,123456789012345678,1234567890123456789012345
,123.123,123.11
--,123400,0.003
Expand All @@ -117,6 +121,7 @@ INSERT INTO pso_data_validator.dvt_ora2pg_types VALUES
,INTERVAL '3 4:05:06.7' DAY TO SECOND(3)
,CAST('DVT' AS BYTEA),CAST('DVT DVT DVT' AS BYTEA)
,CAST('DVT DVT DVT' AS BYTEA),'DVT C','DVT C'
,'{"dvt": 345, "status": "ghi"}','{"dvt": 345, "status": "ghi"}'
);

/* Following table used for validating generating table partitions */
Expand Down
21 changes: 21 additions & 0 deletions tests/system/data_sources/test_db2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
import pytest
from unittest import mock

from data_validation import cli_tools
Expand Down Expand Up @@ -137,6 +138,26 @@ def test_column_validation_core_types_to_bigquery():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_dvt_binary_to_bigquery():
"""DB2 to BigQuery dvt_binary column validation"""
# Remove skip of this test when working on issue-1354
pytest.skip(
"Skipping test_column_validation_dvt_binary_to_bigquery until issue-1354 is actioned"
)
column_validation_test(
tc="bq-conn",
tables="db2inst1.dvt_binary=pso_data_validator.dvt_binary",
count_cols="*",
sum_cols="*",
min_cols="*",
max_cols="*",
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
16 changes: 16 additions & 0 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,22 @@ def test_column_validation_core_types_to_bigquery():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_dvt_binary_to_bigquery():
"""MySQL to BigQuery dvt_binary column validation"""
column_validation_test(
tc="bq-conn",
tables="pso_data_validator.dvt_binary",
count_cols="*",
sum_cols="*",
min_cols="*",
max_cols="*",
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
28 changes: 21 additions & 7 deletions tests/system/data_sources/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
"col_blob",
"col_clob",
"col_nclob",
"col_json",
"col_jsonb",
]


Expand Down Expand Up @@ -238,8 +240,10 @@ def test_schema_validation_not_null_vs_nullable():
def test_schema_validation_oracle_to_postgres():
"""Oracle to PostgreSQL schema validation"""
schema_validation_test(
tables="pso_data_validator.dvt_core_types",
tables="pso_data_validator.dvt_ora2pg_types",
tc="pg-conn",
# TODO We should be able to remove allow_list below when actioning issue-1338.
allow_list="string:json",
allow_list_file="samples/allow_list/oracle_to_postgres.yaml",
)

Expand Down Expand Up @@ -313,6 +317,22 @@ def test_column_validation_oracle_to_postgres():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_dvt_binary_to_bigquery():
"""Oracle to BigQuery dvt_binary column validation"""
column_validation_test(
tc="bq-conn",
tables="pso_data_validator.dvt_binary",
count_cols="*",
sum_cols="*",
min_cols="*",
max_cols="*",
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down Expand Up @@ -382,9 +402,6 @@ def test_row_validation_oracle_to_postgres():
for _ in ORA2PG_COLUMNS
if _
not in (
"col_blob",
"col_clob",
"col_nclob",
"col_raw",
"col_long_raw",
"col_float32",
Expand Down Expand Up @@ -600,9 +617,6 @@ def test_custom_query_row_validation_oracle_to_postgres():
for _ in ORA2PG_COLUMNS
if _
not in (
"col_blob",
"col_clob",
"col_nclob",
"col_raw",
"col_long_raw",
"col_float32",
Expand Down
16 changes: 16 additions & 0 deletions tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,22 @@ def test_column_validation_core_types_to_bigquery():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_dvt_binary_to_bigquery():
"""PostgreSQL to BigQuery dvt_binary column validation"""
column_validation_test(
tc="bq-conn",
tables="pso_data_validator.dvt_binary",
count_cols="*",
sum_cols="*",
min_cols="*",
max_cols="*",
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
16 changes: 16 additions & 0 deletions tests/system/data_sources/test_sql_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,22 @@ def test_column_validation_core_types_to_bigquery():
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_column_validation_dvt_binary_to_bigquery():
"""SQL Server to BigQuery dvt_binary column validation"""
column_validation_test(
tc="bq-conn",
tables="pso_data_validator.dvt_binary",
count_cols="*",
sum_cols="*",
min_cols="*",
max_cols="*",
)


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down
Loading
Loading