Skip to content

Commit

Permalink
Merge pull request #44 from scale-vector/rfix/improves-json-normalizer
Browse files Browse the repository at this point in the history
adds nesting limit, normalizing list of lists and type autodetections to json normalizer
  • Loading branch information
rudolfix authored Jun 29, 2022
2 parents 598f0f9 + f6d55d0 commit 87ae38e
Show file tree
Hide file tree
Showing 30 changed files with 385 additions and 165 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/test_common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ jobs:
virtualenvs-in-project: true
installer-parallel: true

- name: Get pip cache dir
id: pip-cache
run: |
echo "::set-output name=dir::$(poetry run pip cache dir)"
echo "$(poetry run pip cache dir)"
# - name: Get pip cache dir
# id: pip-cache
# run: |
# echo "::set-output name=dir::$(poetry env info -p)"
# echo "$(poetry env info -p)"

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v2
with:
# For some reason, caching your venv does not seem to work as expected on Windows runners. You can see an example of what happens here, where a workflow stalls and runs for over 3 hours before it was manually cancelled.
path: ${{ steps.pip-cache.outputs.dir }}
key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}

- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root

- name: Install self
Expand Down
17 changes: 9 additions & 8 deletions .github/workflows/test_loader_gcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,22 @@ jobs:
virtualenvs-in-project: true
installer-parallel: true

- name: Get pip cache dir
id: pip-cache
run: |
echo "::set-output name=dir::$(poetry run pip cache dir)"
echo "$(poetry run pip cache dir)"
# - name: Get pip cache dir
# id: pip-cache
# run: |
# echo "::set-output name=dir::$(poetry env info -p)"
# echo "$(poetry env info -p)"

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root -E gcp

- name: Install self
Expand Down
17 changes: 9 additions & 8 deletions .github/workflows/test_loader_redshift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,22 @@ jobs:
virtualenvs-in-project: true
installer-parallel: true

- name: Get pip cache dir
id: pip-cache
run: |
echo "::set-output name=dir::$(poetry run pip cache dir)"
echo "$(poetry run pip cache dir)"
# - name: Get pip cache dir
# id: pip-cache
# run: |
# echo "::set-output name=dir::$(poetry env info -p)"
# echo "$(poetry env info -p)"

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache.outputs.dir }}
key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift
# path: ${{ steps.pip-cache.outputs.dir }}
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift

- name: Install dependencies
if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
# if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
run: poetry install --no-interaction --no-root -E redshift

- name: Install self
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def save_atomic(storage_path: str, relative_path: str, data: Any, file_type: str

def load(self, relative_path: str) -> Any:
# raises on file not existing
with self.open(relative_path) as text_file:
with self.open_file(relative_path) as text_file:
return text_file.read()

def delete(self, relative_path: str) -> None:
Expand All @@ -63,7 +63,7 @@ def delete_folder(self, relative_path: str, recursively: bool = False) -> None:
else:
raise NotADirectoryError(folder_path)

def open(self, realtive_path: str, mode: str = "r") -> IO[Any]:
def open_file(self, realtive_path: str, mode: str = "r") -> IO[Any]:
mode = mode + self.file_type
return open(self._make_path(realtive_path), mode, encoding=encoding_for_mode(mode))

Expand Down
8 changes: 4 additions & 4 deletions dlt/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def logToRoot(message: str, *args: Any, **kwargs: Any) -> None:


class _MetricsFormatter(logging.Formatter):
def format(self, record: LogRecord) -> str:
def format(self, record: LogRecord) -> str: # noqa: A003
s = super(_MetricsFormatter, self).format(record)
if record.exc_text:
s = s + '|'
Expand All @@ -78,7 +78,7 @@ def _format_log_object(self, record: LogRecord, request_util: Any) -> Any:
return json_log_object


def _init_logging(logger_name: str, level: str, format: str, component: str, version: StrStr) -> Logger:
def _init_logging(logger_name: str, level: str, fmt: str, component: str, version: StrStr) -> Logger:
if logger_name == "root":
logging.basicConfig(level=level)
handler = logging.getLogger().handlers[0]
Expand All @@ -93,7 +93,7 @@ def _init_logging(logger_name: str, level: str, format: str, component: str, ver
logger.addHandler(handler)

# set right formatter
if is_json_logging(format):
if is_json_logging(fmt):
json_logging.COMPONENT_NAME = component
json_logging.JSON_SERIALIZER = json.dumps
json_logging.RECORD_ATTR_SKIP_LIST.remove("process")
Expand All @@ -104,7 +104,7 @@ def _init_logging(logger_name: str, level: str, format: str, component: str, ver
if logger_name == "root":
json_logging.config_root_logger()
else:
handler.setFormatter(_MetricsFormatter(fmt=format, style='{'))
handler.setFormatter(_MetricsFormatter(fmt=fmt, style='{'))

return logger

Expand Down
147 changes: 80 additions & 67 deletions dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Mapping, Optional, cast, TypedDict, Any
from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any

from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumn, TColumnName, TSimpleRegex
Expand Down Expand Up @@ -33,12 +33,18 @@ class JSONNormalizerConfigPropagation(TypedDict, total=True):

class JSONNormalizerConfig(TypedDict, total=True):
generate_dlt_id: Optional[bool]
max_nesting: Optional[int]
propagation: Optional[JSONNormalizerConfigPropagation]


# for those paths the complex nested objects should be left in place
# current use case: we want to preserve event_slot__value in db even if it's an object
def _is_complex_type(schema: Schema, table_name: str, field_name: str) -> bool:
def _is_complex_type(schema: Schema, table_name: str, field_name: str, _r_lvl: int) -> bool:
# turn everything at the recursion level into complex type
max_nesting = schema._normalizers_config["json"].get("config", {}).get("max_nesting", 1000)
assert _r_lvl <= max_nesting
if _r_lvl == max_nesting:
return True
# or use definition in the schema
column: TColumn = None
table = schema._schema_tables.get(table_name)
if table:
Expand All @@ -50,35 +56,49 @@ def _is_complex_type(schema: Schema, table_name: str, field_name: str) -> bool:
return data_type == "complex"


def _flatten(schema: Schema, table: str, dict_row: TEventRow) -> TEventRow:
def _flatten(schema: Schema, table: str, dict_row: TEventRow, _r_lvl: int) -> Tuple[TEventRow, Dict[str, Sequence[Any]]]:
out_rec_row: DictStrAny = {}
out_rec_list: Dict[str, Sequence[Any]] = {}

def norm_row_dicts(dict_row: StrAny, parent_name: Optional[str]) -> None:
def norm_row_dicts(dict_row: StrAny, __r_lvl: int, parent_name: Optional[str]) -> None:
for k, v in dict_row.items():
corrected_k = schema.normalize_column_name(k)
child_name = corrected_k if not parent_name else schema.normalize_make_path(parent_name, corrected_k)
if isinstance(v, dict):
if _is_complex_type(schema, table, child_name):
out_rec_row[child_name] = v
# for lists and dicts we must check if type is possibly complex
if isinstance(v, dict) or isinstance(v, list):
if not _is_complex_type(schema, table, child_name, __r_lvl):
if isinstance(v, dict):
# flatten the dict more
norm_row_dicts(v, __r_lvl + 1, parent_name=child_name)
else:
# pass the list to out_rec_list
out_rec_list[child_name] = v
continue
else:
norm_row_dicts(v, parent_name=child_name)
else:
out_rec_row[child_name] = v
# pass the complex value to out_rec_row
pass

norm_row_dicts(dict_row, None)
return cast(TEventRow, out_rec_row)
out_rec_row[child_name] = v

norm_row_dicts(dict_row, _r_lvl, None)
return cast(TEventRow, out_rec_row), out_rec_list

def _get_child_row_hash(parent_hash: str, child_table: str, list_pos: int) -> str:

def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) -> str:
# create deterministic unique id of the child row taking into account that all lists are ordered
# and all child tables must be lists
return digest128(f"{parent_hash}_{child_table}_{list_pos}")
return digest128(f"{parent_row_id}_{child_table}_{list_idx}")


def _add_linking(row: TEventRowChild, extend: DictStrAny, parent_row_id: str, list_idx: int) -> TEventRowChild:
row["_dlt_parent_id"] = parent_row_id
row["_dlt_list_idx"] = list_idx
row.update(extend) # type: ignore

return row


def _get_content_hash(schema: Schema, table: str, row: StrAny) -> str:
# generate content hashes only for tables with merge content disposition
# TODO: extend schema with write disposition
# WARNING: row may contain complex types: exclude from hash
return digest128(uniq_id())


Expand All @@ -100,87 +120,80 @@ def _get_propagated_values(schema: Schema, table: str, row: TEventRow, is_top_le
return extend


# generate child tables only for lists
def _normalize_list(
schema: Schema,
seq: Sequence[Any],
extend: DictStrAny,
table: str,
parent_table: str,
parent_row_id: Optional[str] = None,
_r_lvl: int = 0
) -> TUnpackedRowIterator:

v: TEventRowChild = None
for idx, v in enumerate(seq):
# yield child table row
if isinstance(v, dict):
yield from _normalize_row(schema, v, extend, table, parent_table, parent_row_id, idx, _r_lvl)
elif isinstance(v, list):
# unpack lists of lists, we assume all lists in the list have the same type so they should go to the same table
list_table_name = schema.normalize_make_path(table, "list")
yield from _normalize_list(schema, v, extend, list_table_name, parent_table, parent_row_id, _r_lvl + 1)
else:
# list of simple types
child_row_hash = _get_child_row_hash(parent_row_id, table, idx)
e = _add_linking({"value": v, "_dlt_id": child_row_hash}, extend, parent_row_id, idx)
yield (table, parent_table), e


def _normalize_row(
schema: Schema,
dict_row: TEventRow,
extend: DictStrAny,
table: str,
parent_table: Optional[str] = None,
parent_hash: Optional[str] = None,
pos: Optional[int] = None
parent_row_id: Optional[str] = None,
pos: Optional[int] = None,
_r_lvl: int = 0
) -> TUnpackedRowIterator:

def _add_linking(_child_row: TEventRowChild, _p_hash: str, _p_pos: int) -> TEventRowChild:
_child_row["_dlt_parent_id"] = _p_hash
_child_row["_dlt_list_idx"] = _p_pos
_child_row.update(extend) # type: ignore

return _child_row

is_top_level = parent_table is None

# flatten current row
flattened_row = _flatten(schema, table, dict_row)
# flatten current row and extract all lists to recur into
flattened_row, lists = _flatten(schema, table, dict_row, _r_lvl)
# infer record hash or leave existing primary key if present
record_hash = flattened_row.get("_dlt_id", None)
if not record_hash:
row_id = flattened_row.get("_dlt_id", None)
if not row_id:
# check if we have primary key: if so use it
primary_key = schema.filter_row_with_hint(table, "primary_key", flattened_row)
if primary_key:
# create row id from primary key
record_hash = digest128("_".join(map(lambda v: str(v), primary_key.values())))
row_id = digest128("_".join(map(lambda v: str(v), primary_key.values())))
elif not is_top_level:
# child table row deterministic hash
record_hash = _get_child_row_hash(parent_hash, table, pos)
row_id = _get_child_row_hash(parent_row_id, table, pos)
# link to parent table
_add_linking(cast(TEventRowChild, flattened_row), parent_hash, pos)
_add_linking(cast(TEventRowChild, flattened_row), extend, parent_row_id, pos)
else:
# create hash based on the content of the row
record_hash = _get_content_hash(schema, table, flattened_row)
flattened_row["_dlt_id"] = record_hash
row_id = _get_content_hash(schema, table, flattened_row)
flattened_row["_dlt_id"] = row_id

# find fields to propagate to child tables in config
extend.update(_get_propagated_values(schema, table, flattened_row, is_top_level))

# remove all lists from row before yielding
lists = {k: flattened_row[k] for k in flattened_row if isinstance(flattened_row[k], list)} # type: ignore
for k in list(lists.keys()):
# leave complex lists in flattened_row
if not _is_complex_type(schema, table, k):
# remove child list
del flattened_row[k] # type: ignore
else:
pass
# delete complex types from
del lists[k]

# yield parent table first
yield (table, parent_table), flattened_row

# generate child tables only for lists
# normalize and yield lists
for k, list_content in lists.items():
child_table = schema.normalize_make_path(table, k)
# this will skip empty lists
v: TEventRowChild = None
for idx, v in enumerate(list_content):
# yield child table row
tv = type(v)
if tv is dict:
yield from _normalize_row(schema, v, extend, child_table, table, record_hash, idx)
elif tv is list:
# unpack lists of lists
raise ValueError(v)
else:
# list of simple types
child_row_hash = _get_child_row_hash(record_hash, child_table, idx)
e = _add_linking({"value": v, "_dlt_id": child_row_hash}, record_hash, idx)
yield (child_table, table), e
yield from _normalize_list(schema, list_content, extend, schema.normalize_make_path(table, k), table, row_id, _r_lvl + 1)


def extend_schema(schema: Schema) -> None:
# validate config
config = schema._normalizers_config["json"].get("config", {})
validate_dict(JSONNormalizerConfig, config, "./normalizers/json/config", validator=column_name_validator(schema.normalize_column_name))
validate_dict(JSONNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.normalize_column_name))

# quick check to see if hints are applied
default_hints = schema.schema_settings.get("default_hints", {})
Expand Down
Loading

0 comments on commit 87ae38e

Please sign in to comment.