From dcf39895b4b20da09dee766f47ffc7b7e130719c Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 16:28:41 +0200 Subject: [PATCH 1/6] caches poetry venv root in ci --- .github/workflows/test_common.yml | 7 +++---- .github/workflows/test_loader_gcp.yml | 6 +++--- .github/workflows/test_loader_redshift.yml | 6 +++--- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index 95f2d1bb59..bcca44e05a 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -41,19 +41,18 @@ jobs: - name: Get pip cache dir id: pip-cache run: | - echo "::set-output name=dir::$(poetry run pip cache dir)" - echo "$(poetry run pip cache dir)" + echo "::set-output name=dir::$(poetry env info -p)" + echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies uses: actions/cache@v2 with: - # For some reason, caching your venv does not seem to work as expected on Windows runners. You can see an example of what happens here, where a workflow stalls and runs for over 3 hours before it was manually cancelled. path: ${{ steps.pip-cache.outputs.dir }} key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} - name: Install dependencies - if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' run: poetry install --no-interaction --no-root - name: Install self diff --git a/.github/workflows/test_loader_gcp.yml b/.github/workflows/test_loader_gcp.yml index 93fa1911c7..2dca754ec5 100644 --- a/.github/workflows/test_loader_gcp.yml +++ b/.github/workflows/test_loader_gcp.yml @@ -47,8 +47,8 @@ jobs: - name: Get pip cache dir id: pip-cache run: | - echo "::set-output name=dir::$(poetry run pip cache dir)" - echo "$(poetry run pip cache dir)" + echo "::set-output name=dir::$(poetry env info -p)" + echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies @@ -58,7 +58,7 @@ jobs: key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies - if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' run: poetry install --no-interaction --no-root -E gcp - name: Install self diff --git a/.github/workflows/test_loader_redshift.yml b/.github/workflows/test_loader_redshift.yml index 13cb0d1066..c10b4824aa 100644 --- a/.github/workflows/test_loader_redshift.yml +++ b/.github/workflows/test_loader_redshift.yml @@ -47,8 +47,8 @@ jobs: - name: Get pip cache dir id: pip-cache run: | - echo "::set-output name=dir::$(poetry run pip cache dir)" - echo "$(poetry run pip cache dir)" + echo "::set-output name=dir::$(poetry env info -p)" + echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies @@ -58,7 +58,7 @@ jobs: key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift - name: Install dependencies - if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' run: poetry install --no-interaction --no-root -E redshift - name: Install self From c00c1cfe819044bc1bfebd92f6f3273cf91ad42e Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 16:30:55 +0200 Subject: [PATCH 2/6] adds recursion depth limit and list of list normalization + tests --- dlt/common/normalizers/json/relational.py | 147 ++++++++++-------- .../normalizers/test_json_relational.py | 92 ++++++++++- 2 files changed, 164 insertions(+), 75 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 734814715f..c0244a2360 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -1,4 +1,4 @@ -from typing import Mapping, Optional, cast, TypedDict, Any +from typing import Dict, Mapping, Optional, Sequence, Tuple, cast, TypedDict, Any from dlt.common.schema import Schema from dlt.common.schema.typing import TColumn, TColumnName, TSimpleRegex @@ -33,12 +33,18 @@ class JSONNormalizerConfigPropagation(TypedDict, total=True): class JSONNormalizerConfig(TypedDict, total=True): generate_dlt_id: Optional[bool] + max_nesting: Optional[int] propagation: Optional[JSONNormalizerConfigPropagation] # for those paths the complex nested objects should be left in place -# current use case: we want to preserve event_slot__value in db even if it's an object -def _is_complex_type(schema: Schema, table_name: str, field_name: str) -> bool: +def _is_complex_type(schema: Schema, table_name: str, field_name: str, _r_lvl: int) -> bool: + # turn everything at the recursion level into complex type + max_nesting = schema._normalizers_config["json"].get("config", {}).get("max_nesting", 1000) + assert _r_lvl <= max_nesting + if _r_lvl == max_nesting: + return True + # or use definition in the schema column: TColumn = None table = schema._schema_tables.get(table_name) if table: @@ -50,35 +56,49 @@ def _is_complex_type(schema: Schema, table_name: str, field_name: str) -> bool: return data_type == "complex" -def _flatten(schema: Schema, table: str, dict_row: TEventRow) -> TEventRow: +def _flatten(schema: Schema, table: str, dict_row: TEventRow, _r_lvl: int) -> Tuple[TEventRow, Dict[str, Sequence[Any]]]: out_rec_row: DictStrAny = {} + out_rec_list: Dict[str, Sequence[Any]] = {} - def norm_row_dicts(dict_row: StrAny, parent_name: Optional[str]) -> None: + def norm_row_dicts(dict_row: StrAny, __r_lvl: int, parent_name: Optional[str]) -> None: for k, v in dict_row.items(): corrected_k = schema.normalize_column_name(k) child_name = corrected_k if not parent_name else schema.normalize_make_path(parent_name, corrected_k) - if isinstance(v, dict): - if _is_complex_type(schema, table, child_name): - out_rec_row[child_name] = v + # for lists and dicts we must check if type is possibly complex + if isinstance(v, dict) or isinstance(v, list): + if not _is_complex_type(schema, table, child_name, __r_lvl): + if isinstance(v, dict): + # flatten the dict more + norm_row_dicts(v, __r_lvl + 1, parent_name=child_name) + else: + # pass the list to out_rec_list + out_rec_list[child_name] = v + continue else: - norm_row_dicts(v, parent_name=child_name) - else: - out_rec_row[child_name] = v + # pass the complex value to out_rec_row + pass - norm_row_dicts(dict_row, None) - return cast(TEventRow, out_rec_row) + out_rec_row[child_name] = v + norm_row_dicts(dict_row, _r_lvl, None) + return cast(TEventRow, out_rec_row), out_rec_list -def _get_child_row_hash(parent_hash: str, child_table: str, list_pos: int) -> str: + +def _get_child_row_hash(parent_row_id: str, child_table: str, list_idx: int) -> str: # create deterministic unique id of the child row taking into account that all lists are ordered # and all child tables must be lists - return digest128(f"{parent_hash}_{child_table}_{list_pos}") + return digest128(f"{parent_row_id}_{child_table}_{list_idx}") + + +def _add_linking(row: TEventRowChild, extend: DictStrAny, parent_row_id: str, list_idx: int) -> TEventRowChild: + row["_dlt_parent_id"] = parent_row_id + row["_dlt_list_idx"] = list_idx + row.update(extend) # type: ignore + + return row def _get_content_hash(schema: Schema, table: str, row: StrAny) -> str: - # generate content hashes only for tables with merge content disposition - # TODO: extend schema with write disposition - # WARNING: row may contain complex types: exclude from hash return digest128(uniq_id()) @@ -100,87 +120,80 @@ def _get_propagated_values(schema: Schema, table: str, row: TEventRow, is_top_le return extend +# generate child tables only for lists +def _normalize_list( + schema: Schema, + seq: Sequence[Any], + extend: DictStrAny, + table: str, + parent_table: str, + parent_row_id: Optional[str] = None, + _r_lvl: int = 0 +) -> TUnpackedRowIterator: + + v: TEventRowChild = None + for idx, v in enumerate(seq): + # yield child table row + if isinstance(v, dict): + yield from _normalize_row(schema, v, extend, table, parent_table, parent_row_id, idx, _r_lvl) + elif isinstance(v, list): + # unpack lists of lists, we assume all lists in the list have the same type so they should go to the same table + list_table_name = schema.normalize_make_path(table, "list") + yield from _normalize_list(schema, v, extend, list_table_name, parent_table, parent_row_id, _r_lvl + 1) + else: + # list of simple types + child_row_hash = _get_child_row_hash(parent_row_id, table, idx) + e = _add_linking({"value": v, "_dlt_id": child_row_hash}, extend, parent_row_id, idx) + yield (table, parent_table), e + + def _normalize_row( schema: Schema, dict_row: TEventRow, extend: DictStrAny, table: str, parent_table: Optional[str] = None, - parent_hash: Optional[str] = None, - pos: Optional[int] = None + parent_row_id: Optional[str] = None, + pos: Optional[int] = None, + _r_lvl: int = 0 ) -> TUnpackedRowIterator: - def _add_linking(_child_row: TEventRowChild, _p_hash: str, _p_pos: int) -> TEventRowChild: - _child_row["_dlt_parent_id"] = _p_hash - _child_row["_dlt_list_idx"] = _p_pos - _child_row.update(extend) # type: ignore - - return _child_row - is_top_level = parent_table is None - - # flatten current row - flattened_row = _flatten(schema, table, dict_row) + # flatten current row and extract all lists to recur into + flattened_row, lists = _flatten(schema, table, dict_row, _r_lvl) # infer record hash or leave existing primary key if present - record_hash = flattened_row.get("_dlt_id", None) - if not record_hash: + row_id = flattened_row.get("_dlt_id", None) + if not row_id: # check if we have primary key: if so use it primary_key = schema.filter_row_with_hint(table, "primary_key", flattened_row) if primary_key: # create row id from primary key - record_hash = digest128("_".join(map(lambda v: str(v), primary_key.values()))) + row_id = digest128("_".join(map(lambda v: str(v), primary_key.values()))) elif not is_top_level: # child table row deterministic hash - record_hash = _get_child_row_hash(parent_hash, table, pos) + row_id = _get_child_row_hash(parent_row_id, table, pos) # link to parent table - _add_linking(cast(TEventRowChild, flattened_row), parent_hash, pos) + _add_linking(cast(TEventRowChild, flattened_row), extend, parent_row_id, pos) else: # create hash based on the content of the row - record_hash = _get_content_hash(schema, table, flattened_row) - flattened_row["_dlt_id"] = record_hash + row_id = _get_content_hash(schema, table, flattened_row) + flattened_row["_dlt_id"] = row_id # find fields to propagate to child tables in config extend.update(_get_propagated_values(schema, table, flattened_row, is_top_level)) - # remove all lists from row before yielding - lists = {k: flattened_row[k] for k in flattened_row if isinstance(flattened_row[k], list)} # type: ignore - for k in list(lists.keys()): - # leave complex lists in flattened_row - if not _is_complex_type(schema, table, k): - # remove child list - del flattened_row[k] # type: ignore - else: - pass - # delete complex types from - del lists[k] - # yield parent table first yield (table, parent_table), flattened_row - # generate child tables only for lists + # normalize and yield lists for k, list_content in lists.items(): - child_table = schema.normalize_make_path(table, k) - # this will skip empty lists - v: TEventRowChild = None - for idx, v in enumerate(list_content): - # yield child table row - tv = type(v) - if tv is dict: - yield from _normalize_row(schema, v, extend, child_table, table, record_hash, idx) - elif tv is list: - # unpack lists of lists - raise ValueError(v) - else: - # list of simple types - child_row_hash = _get_child_row_hash(record_hash, child_table, idx) - e = _add_linking({"value": v, "_dlt_id": child_row_hash}, record_hash, idx) - yield (child_table, table), e + yield from _normalize_list(schema, list_content, extend, schema.normalize_make_path(table, k), table, row_id, _r_lvl + 1) def extend_schema(schema: Schema) -> None: # validate config config = schema._normalizers_config["json"].get("config", {}) - validate_dict(JSONNormalizerConfig, config, "./normalizers/json/config", validator=column_name_validator(schema.normalize_column_name)) + validate_dict(JSONNormalizerConfig, config, "./normalizers/json/config", validator_f=column_name_validator(schema.normalize_column_name)) # quick check to see if hints are applied default_hints = schema.schema_settings.get("default_hints", {}) diff --git a/tests/common/normalizers/test_json_relational.py b/tests/common/normalizers/test_json_relational.py index aed55395c9..aefa189389 100644 --- a/tests/common/normalizers/test_json_relational.py +++ b/tests/common/normalizers/test_json_relational.py @@ -28,16 +28,19 @@ def test_flatten_fix_field_name(schema: Schema) -> None: } } } - flattened_row = _flatten(schema, "mock_table", row) + flattened_row, lists = _flatten(schema, "mock_table", row) assert "f_1" in flattened_row - assert "f_2" in flattened_row + # assert "f_2" in flattened_row assert "f_3__f4" in flattened_row assert "f_3__f_5" in flattened_row assert "f_3__f_6__c" in flattened_row assert "f_3__f_6__c_v" in flattened_row - assert "f_3__f_6__c_x" in flattened_row + # assert "f_3__f_6__c_x" in flattened_row assert "f_3" not in flattened_row + assert "f_2" in lists + assert "f_3__f_6__c_x" in lists + def test_preserve_complex_value(schema: Schema) -> None: # add table with complex column @@ -52,13 +55,13 @@ def test_preserve_complex_value(schema: Schema) -> None: row_1 = { "value": 1 } - flattened_row = _flatten(schema, "with_complex", row_1) + flattened_row, _ = _flatten(schema, "with_complex", row_1) assert flattened_row["value"] == 1 row_2 = { "value": {"complex": True} } - flattened_row = _flatten(schema, "with_complex", row_2) + flattened_row, _ = _flatten(schema, "with_complex", row_2) assert flattened_row["value"] == row_2["value"] # complex value is not flattened assert "value__complex" not in flattened_row @@ -68,18 +71,17 @@ def test_preserve_complex_value_with_hint(schema: Schema) -> None: # add preferred type for "value" schema._settings.setdefault("preferred_types", {})["re:^value$"] = "complex" schema._compile_regexes() - print(schema._compiled_preferred_types) row_1 = { "value": 1 } - flattened_row = _flatten(schema, "any_table", row_1) + flattened_row, _ = _flatten(schema, "any_table", row_1) assert flattened_row["value"] == 1 row_2 = { "value": {"complex": True} } - flattened_row = _flatten(schema, "any_table", row_2) + flattened_row, _ = _flatten(schema, "any_table", row_2) assert flattened_row["value"] == row_2["value"] # complex value is not flattened assert "value__complex" not in flattened_row @@ -302,6 +304,20 @@ def test_list_position(schema: Schema) -> None: assert row["_dlt_list_idx"] == pos +def test_list_of_lists(schema: Schema) -> None: + row = { + "l":[ + ["a", "b", "c"], + [ + ["a", "b", "b"] + ], + "a", 1, 1.1 + ] + } + rows = list(_normalize_row(schema, row, {}, "table")) + print(rows) + + def test_child_row_deterministic_hash(schema: Schema) -> None: row_id = uniq_id() # directly set record hash so it will be adopted in unpacker as top level hash @@ -453,6 +469,66 @@ def test_preserves_complex_types_list(schema: Schema) -> None: root_row = next(r for r in normalized_rows if r[0][1] is None) assert root_row[1]["value"] == row["value"] + # same should work for a list + row = { + "value": ["from", ["complex", True]] + } + normalized_rows = list(_normalize_row(schema, row, {}, "event_slot")) + # make sure only 1 row is emitted, the list is not unpacked + assert len(normalized_rows) == 1 + # value is kept in root row -> market as complex + root_row = next(r for r in normalized_rows if r[0][1] is None) + assert root_row[1]["value"] == row["value"] + + +def test_complex_types_for_recursion_level(schema: Schema) -> None: + add_dlt_root_id_propagation(schema) + # if max recursion depth is set, nested elements will be kept as complex + row = { + "_dlt_id": "row_id", + "f": [{ + "l": ["a"], # , "b", "c" + "v": 120, + "lo": [{"e": {"v": 1}}] # , {"e": {"v": 2}}, {"e":{"v":3 }} + }] + } + n_rows_nl = list(schema.normalize_json(schema, row, "load_id")) + # all nested elements were yielded + assert ["default", "default__f", "default__f__l", "default__f__lo"] == [r[0][0] for r in n_rows_nl] + + # set max nesting to 0 + schema._normalizers_config["json"]["config"]["max_nesting"] = 0 + n_rows = list(schema.normalize_json(schema, row, "load_id")) + # the "f" element is left as complex type and not normalized + assert len(n_rows) == 1 + assert n_rows[0][0][0] == "default" + assert "f" in n_rows[0][1] + assert type(n_rows[0][1]["f"]) is list + + # max nesting 1 + schema._normalizers_config["json"]["config"]["max_nesting"] = 1 + n_rows = list(schema.normalize_json(schema, row, "load_id")) + assert len(n_rows) == 2 + assert ["default", "default__f"] == [r[0][0] for r in n_rows] + # on level f, "l" and "lo" are not normalized + assert "l" in n_rows[1][1] + assert type(n_rows[1][1]["l"]) is list + assert "lo" in n_rows[1][1] + assert type(n_rows[1][1]["lo"]) is list + + # max nesting 2 + schema._normalizers_config["json"]["config"]["max_nesting"] = 2 + n_rows = list(schema.normalize_json(schema, row, "load_id")) + assert len(n_rows) == 4 + # in default__f__lo the dicts that would be flattened are complex types + last_row = n_rows[3] + assert last_row[1]["e"] == {"v": 1} + + # max nesting 3 + schema._normalizers_config["json"]["config"]["max_nesting"] = 3 + n_rows = list(schema.normalize_json(schema, row, "load_id")) + assert n_rows_nl == n_rows + def test_extract_with_table_name_meta() -> None: row = { From ecaf29ccceafc1e49988300e75e4e3732c8d1995 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 16:31:46 +0200 Subject: [PATCH 3/6] adds data type autodetection + tests --- dlt/common/schema/detections.py | 34 +++++++++++++++++++++++ dlt/common/schema/schema.py | 16 +++++------ dlt/common/schema/typing.py | 5 +++- tests/common/schema/test_coercion.py | 12 +++++++-- tests/common/schema/test_detections.py | 37 ++++++++++++++++++++++++++ tests/common/schema/test_inference.py | 10 ++++++- 6 files changed, 102 insertions(+), 12 deletions(-) create mode 100644 dlt/common/schema/detections.py create mode 100644 tests/common/schema/test_detections.py diff --git a/dlt/common/schema/detections.py b/dlt/common/schema/detections.py new file mode 100644 index 0000000000..bcb745d724 --- /dev/null +++ b/dlt/common/schema/detections.py @@ -0,0 +1,34 @@ +import datetime # noqa: 251 +from typing import Any, Optional, Type + +from dlt.common import pendulum +from dlt.common.schema.typing import TDataType + + +_NOW_TS: float = pendulum.now().timestamp() +_FLOAT_TS_RANGE = 31536000.0 # seconds in year + + +def is_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: + # autodetect int and float withing 1 year range of NOW + if t in [int, float]: + if v >= _NOW_TS - _FLOAT_TS_RANGE and v <= _NOW_TS + _FLOAT_TS_RANGE: + return "timestamp" + return None + + +def is_iso_timestamp(t: Type[Any], v: Any) -> Optional[TDataType]: + # only strings can be converted + if t is not str: + return None + if not v: + return None + # strict autodetection of iso timestamps + try: + dt = pendulum.parse(v, strict=True, exact=True) + if isinstance(dt, datetime.datetime): + return "timestamp" + except Exception: + pass + return None + diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 9e150e63dc..2bb9ee0d0f 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -197,7 +197,7 @@ def filter_row_with_hint(self, table_name: str, hint_type: THintType, row: StrAn def merge_hints(self, new_hints: Mapping[THintType, Sequence[TSimpleRegex]]) -> None: # validate regexes - validate_dict(TSchemaSettings, {"default_hints": new_hints}, ".", validator=utils.simple_regex_validator) + validate_dict(TSchemaSettings, {"default_hints": new_hints}, ".", validator_f=utils.simple_regex_validator) # prepare hints to be added default_hints = self._settings.setdefault("default_hints", {}) # add `new_hints` to existing hints @@ -315,7 +315,12 @@ def _coerce_non_null_value(self, table_columns: TTableColumns, table_name: str, return variant_col_name, new_column, rv def _map_value_to_column_type(self, v: Any, k: str) -> TDataType: - mapped_type = utils.py_type_to_sc_type(type(v)) + tv = type(v) + # try to autodetect data type + mapped_type = utils.autodetect_sc_type(self._normalizers_config.get("detections"), tv, v) + # if not try standard type mapping + if mapped_type is None: + mapped_type = utils.py_type_to_sc_type(tv) # get preferred type based on column name preferred_type = self.get_preferred_type(k) # try to match python type to preferred @@ -348,12 +353,7 @@ def _add_standard_hints(self) -> None: def _configure_normalizers(self) -> None: if not self._normalizers_config: # create default normalizer config - self._normalizers_config = { - "names": "dlt.common.normalizers.names.snake_case", - "json": { - "module": "dlt.common.normalizers.json.relational" - } - } + self._normalizers_config = utils.default_normalizers() # import desired modules naming_module = import_module(self._normalizers_config["names"]) json_module = import_module(self._normalizers_config["json"]["module"]) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index f9c36f9175..7efcf08972 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Literal, Optional, Set, TypedDict, NewType +from typing import Any, Callable, Dict, List, Literal, Optional, Set, Type, TypedDict, NewType from dlt.common.typing import StrAny @@ -7,6 +7,8 @@ THintType = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] TColumnProp = Literal["name", "data_type", "nullable", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"] TWriteDisposition = Literal["skip", "append", "replace", "merge"] +TTypeDetections = Literal["timestamp", "iso_timestamp"] +TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] DATA_TYPES: Set[TDataType] = set(["text", "double", "bool", "timestamp", "bigint", "binary", "complex", "decimal", "wei"]) COLUMN_PROPS: Set[TColumnProp] = set(["name", "data_type", "nullable", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique"]) @@ -66,6 +68,7 @@ class TJSONNormalizer(TypedDict, total=False): class TNormalizersConfig(TypedDict, total=True): names: str + detections: Optional[List[TTypeDetections]] json: TJSONNormalizer diff --git a/tests/common/schema/test_coercion.py b/tests/common/schema/test_coercion.py index d6b7be0240..0db1e1c89e 100644 --- a/tests/common/schema/test_coercion.py +++ b/tests/common/schema/test_coercion.py @@ -1,8 +1,10 @@ import pytest +import datetime # noqa: I251 -from dlt.common import Decimal, json +from dlt.common import Decimal, json, pendulum from dlt.common.schema import utils + def test_coerce_type() -> None: # same type coercion assert utils.coerce_type("double", "double", 8721.1) == 8721.1 @@ -85,6 +87,12 @@ def test_coerce_type_binary() -> None: assert utils.coerce_type("binary", "text", "!YmluYXJ5IHN0cmluZw==") +def test_py_type_to_sc_type() -> None: + assert utils.py_type_to_sc_type(type(pendulum.now())) == "timestamp" + assert utils.py_type_to_sc_type(type(datetime.datetime(1988, 12, 1))) == "timestamp" + assert utils.py_type_to_sc_type(type(Decimal(1))) == "decimal" + + def test_coerce_type_complex() -> None: # dicts and lists should be coerced into strings automatically v_list = [1, 2, "3", {"complex": True}] @@ -99,4 +107,4 @@ def test_coerce_type_complex() -> None: assert utils.coerce_type("text", "complex", v_list) == json.dumps(v_list) # all other coercions fail with pytest.raises(ValueError): - utils.coerce_type("binary", "complex", v_list) \ No newline at end of file + utils.coerce_type("binary", "complex", v_list) diff --git a/tests/common/schema/test_detections.py b/tests/common/schema/test_detections.py new file mode 100644 index 0000000000..12dcd5eb90 --- /dev/null +++ b/tests/common/schema/test_detections.py @@ -0,0 +1,37 @@ +from dlt.common import pendulum +from dlt.common.schema.utils import autodetect_sc_type +from dlt.common.schema.detections import is_timestamp, is_iso_timestamp, _FLOAT_TS_RANGE, _NOW_TS + + +def test_timestamp_detection() -> None: + # datetime.datetime + assert is_timestamp(float, pendulum.now().timestamp()) == "timestamp" + assert is_timestamp(int, pendulum.now().int_timestamp) == "timestamp" + assert is_timestamp(str, pendulum.now().timestamp()) is None + assert is_timestamp(float, _NOW_TS - _FLOAT_TS_RANGE - 0.1) is None + assert is_timestamp(float, _NOW_TS + _FLOAT_TS_RANGE + 0.1) is None + + +def test_iso_timestamp_detection() -> None: + assert is_iso_timestamp(str, str(pendulum.now())) == "timestamp" + assert is_iso_timestamp(str, "1975-05-21T22:00:00Z") == "timestamp" + assert is_iso_timestamp(str, "1975-0521T22:00:00Z") == "timestamp" + # dates and times are not accepted + assert is_iso_timestamp(str, "1975-05-21") is None + assert is_iso_timestamp(str, "22:00:00") is None + # wrong formats + assert is_iso_timestamp(str, "0-05-01T27:00:00Z") is None + assert is_iso_timestamp(str, "") is None + assert is_iso_timestamp(str, "1975-05-01T27:00:00Z") is None + assert is_iso_timestamp(str, "1975-0521T22 00:00") is None + assert is_iso_timestamp(str, "Wed, 29 Jun 2022 13:56:34 +0000") is None + # wrong type + assert is_iso_timestamp(float, str(pendulum.now())) is None + + +def test_detection_function() -> None: + assert autodetect_sc_type(None, str, str(pendulum.now())) is None + assert autodetect_sc_type(["iso_timestamp"], str, str(pendulum.now())) == "timestamp" + assert autodetect_sc_type(["iso_timestamp"], float, str(pendulum.now())) is None + assert autodetect_sc_type(["timestamp"], str, str(pendulum.now())) is None + assert autodetect_sc_type(["timestamp", "iso_timestamp"], float, pendulum.now().timestamp()) == "timestamp" diff --git a/tests/common/schema/test_inference.py b/tests/common/schema/test_inference.py index 3d0547aa77..fdf8552676 100644 --- a/tests/common/schema/test_inference.py +++ b/tests/common/schema/test_inference.py @@ -1,6 +1,6 @@ import pytest -from dlt.common import Decimal, json +from dlt.common import Decimal, json, pendulum from dlt.common.schema import Schema from dlt.common.schema.exceptions import CannotCoerceColumnException, CannotCoerceNullException @@ -220,6 +220,14 @@ def test_corece_null_value_over_not_null(schema: Schema) -> None: schema.coerce_row("event_user", None, row) +def test_infer_with_autodetection(schema: Schema) -> None: + c = schema._infer_column("ts", pendulum.now().timestamp()) + assert c["data_type"] == "timestamp" + schema._normalizers_config["detections"] = None + c = schema._infer_column("ts", pendulum.now().timestamp()) + assert c["data_type"] == "double" + + def _add_preferred_types(schema: Schema) -> None: schema._settings["preferred_types"] = {} schema._settings["preferred_types"]["timestamp"] = "timestamp" From 19206f305bb51496957b976ae9649f471d144345 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 16:32:09 +0200 Subject: [PATCH 4/6] adds builtin linter module --- poetry.lock | 20 +++++++++++++++++++- pyproject.toml | 1 + 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 4ce87b4366..52ed86e461 100644 --- a/poetry.lock +++ b/poetry.lock @@ -320,6 +320,20 @@ flake8 = ">=3.0.0" [package.extras] dev = ["coverage", "hypothesis", "hypothesmith (>=0.2)", "pre-commit"] +[[package]] +name = "flake8-builtins" +version = "1.5.3" +description = "Check for python builtins being used as variables or parameters." +category = "dev" +optional = false +python-versions = "*" + +[package.dependencies] +flake8 = "*" + +[package.extras] +test = ["coverage", "coveralls", "mock", "pytest", "pytest-cov"] + [[package]] name = "flake8-encodings" version = "0.5.0.post1" @@ -1358,7 +1372,7 @@ redshift = ["psycopg2-binary"] [metadata] lock-version = "1.1" python-versions = "^3.8,<3.11" -content-hash = "b919ccc29a9271c32772c4182f4eb5f4f22be195b21020f22816a6842e871f89" +content-hash = "448cd68c50ebcca752249710d391fdba109746633be811523894c02b0e7da60f" [metadata.files] agate = [ @@ -1519,6 +1533,10 @@ flake8-bugbear = [ {file = "flake8-bugbear-21.11.29.tar.gz", hash = "sha256:8b04cb2fafc6a78e1a9d873bd3988e4282f7959bb6b0d7c1ae648ec09b937a7b"}, {file = "flake8_bugbear-21.11.29-py36.py37.py38-none-any.whl", hash = "sha256:179e41ddae5de5e3c20d1f61736feeb234e70958fbb56ab3c28a67739c8e9a82"}, ] +flake8-builtins = [ + {file = "flake8-builtins-1.5.3.tar.gz", hash = "sha256:09998853b2405e98e61d2ff3027c47033adbdc17f9fe44ca58443d876eb00f3b"}, + {file = "flake8_builtins-1.5.3-py2.py3-none-any.whl", hash = "sha256:7706babee43879320376861897e5d1468e396a40b8918ed7bccf70e5f90b8687"}, +] flake8-encodings = [ {file = "flake8_encodings-0.5.0.post1-py3-none-any.whl", hash = "sha256:d2fecca0e89ba09c86e5d61cf6bdb1b337f0d74746aac67bbcf0c517b4cb6cba"}, {file = "flake8_encodings-0.5.0.post1.tar.gz", hash = "sha256:082c0163325c85b438a8106e876283b5ed3cbfc53e68d89130d70be8be4c9977"}, diff --git a/pyproject.toml b/pyproject.toml index 708fc5f245..5849a7ef19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,7 @@ types-requests = "^2.25.6" types-python-dateutil = "^2.8.15" flake8-tidy-imports = "^4.8.0" flake8-encodings = "^0.5.0" +flake8-builtins = "^1.5.3" [tool.poetry.extras] dbt = ["dbt-core", "GitPython", "dbt-redshift", "dbt-bigquery"] From 007c4c993abc357b9d71ef270db188fa9b6e28be Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 16:32:44 +0200 Subject: [PATCH 5/6] fixes builtin name clashes --- .github/workflows/test_common.yml | 15 ++++---- .github/workflows/test_loader_gcp.yml | 15 ++++---- .github/workflows/test_loader_redshift.yml | 15 ++++---- dlt/common/file_storage.py | 4 +-- dlt/common/logger.py | 8 ++--- dlt/common/schema/utils.py | 35 ++++++++++++++----- dlt/common/storages/loader_storage.py | 4 +-- dlt/common/utils.py | 10 +++--- dlt/common/validation.py | 12 +++---- dlt/dbt_runner/runner.py | 2 +- dlt/dbt_runner/utils.py | 6 ++-- dlt/unpacker/unpacker.py | 2 +- examples/quickstart.py | 12 ++++--- examples/sources/singer_tap.py | 2 +- .../normalizers/test_json_relational.py | 10 +++--- tests/common/storages/test_file_storage.py | 4 +-- tests/common/test_utils.py | 1 - tests/dbt_runner/test_utils.py | 12 +++---- tests/test_validation.py | 2 +- tests/unpacker/test_unpacker.py | 2 +- tests/utils.py | 4 +-- 21 files changed, 100 insertions(+), 77 deletions(-) diff --git a/.github/workflows/test_common.yml b/.github/workflows/test_common.yml index bcca44e05a..dbaada8d54 100644 --- a/.github/workflows/test_common.yml +++ b/.github/workflows/test_common.yml @@ -38,18 +38,19 @@ jobs: virtualenvs-in-project: true installer-parallel: true - - name: Get pip cache dir - id: pip-cache - run: | - echo "::set-output name=dir::$(poetry env info -p)" - echo "$(poetry env info -p)" + # - name: Get pip cache dir + # id: pip-cache + # run: | + # echo "::set-output name=dir::$(poetry env info -p)" + # echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies uses: actions/cache@v2 with: - path: ${{ steps.pip-cache.outputs.dir }} - key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} + # path: ${{ steps.pip-cache.outputs.dir }} + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }} - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' diff --git a/.github/workflows/test_loader_gcp.yml b/.github/workflows/test_loader_gcp.yml index 2dca754ec5..80deec2d88 100644 --- a/.github/workflows/test_loader_gcp.yml +++ b/.github/workflows/test_loader_gcp.yml @@ -44,18 +44,19 @@ jobs: virtualenvs-in-project: true installer-parallel: true - - name: Get pip cache dir - id: pip-cache - run: | - echo "::set-output name=dir::$(poetry env info -p)" - echo "$(poetry env info -p)" + # - name: Get pip cache dir + # id: pip-cache + # run: | + # echo "::set-output name=dir::$(poetry env info -p)" + # echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies uses: actions/cache@v2 with: - path: ${{ steps.pip-cache.outputs.dir }} - key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp + # path: ${{ steps.pip-cache.outputs.dir }} + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' diff --git a/.github/workflows/test_loader_redshift.yml b/.github/workflows/test_loader_redshift.yml index c10b4824aa..f4ce8cce28 100644 --- a/.github/workflows/test_loader_redshift.yml +++ b/.github/workflows/test_loader_redshift.yml @@ -44,18 +44,19 @@ jobs: virtualenvs-in-project: true installer-parallel: true - - name: Get pip cache dir - id: pip-cache - run: | - echo "::set-output name=dir::$(poetry env info -p)" - echo "$(poetry env info -p)" + # - name: Get pip cache dir + # id: pip-cache + # run: | + # echo "::set-output name=dir::$(poetry env info -p)" + # echo "$(poetry env info -p)" - name: Load cached venv id: cached-poetry-dependencies uses: actions/cache@v2 with: - path: ${{ steps.pip-cache.outputs.dir }} - key: pip-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift + # path: ${{ steps.pip-cache.outputs.dir }} + path: .venv + key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-redshift - name: Install dependencies # if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' diff --git a/dlt/common/file_storage.py b/dlt/common/file_storage.py index c887cead5c..6c1dc70717 100644 --- a/dlt/common/file_storage.py +++ b/dlt/common/file_storage.py @@ -43,7 +43,7 @@ def save_atomic(storage_path: str, relative_path: str, data: Any, file_type: str def load(self, relative_path: str) -> Any: # raises on file not existing - with self.open(relative_path) as text_file: + with self.open_file(relative_path) as text_file: return text_file.read() def delete(self, relative_path: str) -> None: @@ -63,7 +63,7 @@ def delete_folder(self, relative_path: str, recursively: bool = False) -> None: else: raise NotADirectoryError(folder_path) - def open(self, realtive_path: str, mode: str = "r") -> IO[Any]: + def open_file(self, realtive_path: str, mode: str = "r") -> IO[Any]: mode = mode + self.file_type return open(self._make_path(realtive_path), mode, encoding=encoding_for_mode(mode)) diff --git a/dlt/common/logger.py b/dlt/common/logger.py index f35a234152..fa59de539c 100644 --- a/dlt/common/logger.py +++ b/dlt/common/logger.py @@ -57,7 +57,7 @@ def logToRoot(message: str, *args: Any, **kwargs: Any) -> None: class _MetricsFormatter(logging.Formatter): - def format(self, record: LogRecord) -> str: + def format(self, record: LogRecord) -> str: # noqa: A003 s = super(_MetricsFormatter, self).format(record) if record.exc_text: s = s + '|' @@ -78,7 +78,7 @@ def _format_log_object(self, record: LogRecord, request_util: Any) -> Any: return json_log_object -def _init_logging(logger_name: str, level: str, format: str, component: str, version: StrStr) -> Logger: +def _init_logging(logger_name: str, level: str, fmt: str, component: str, version: StrStr) -> Logger: if logger_name == "root": logging.basicConfig(level=level) handler = logging.getLogger().handlers[0] @@ -93,7 +93,7 @@ def _init_logging(logger_name: str, level: str, format: str, component: str, ver logger.addHandler(handler) # set right formatter - if is_json_logging(format): + if is_json_logging(fmt): json_logging.COMPONENT_NAME = component json_logging.JSON_SERIALIZER = json.dumps json_logging.RECORD_ATTR_SKIP_LIST.remove("process") @@ -104,7 +104,7 @@ def _init_logging(logger_name: str, level: str, format: str, component: str, ver if logger_name == "root": json_logging.config_root_logger() else: - handler.setFormatter(_MetricsFormatter(fmt=format, style='{')) + handler.setFormatter(_MetricsFormatter(fmt=fmt, style='{')) return logger diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 7a15d762e7..825db24bc6 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -4,7 +4,7 @@ import binascii import datetime # noqa: I251 from dateutil.parser import isoparse -from typing import Dict, List, Sequence, Type, Any, cast +from typing import Callable, Dict, List, Sequence, Type, Any, Union, cast, Optional from dlt.common import pendulum, json, Decimal from dlt.common.arithmetics import ConversionSyntax @@ -12,7 +12,8 @@ from dlt.common.normalizers.names import TNormalizeNameFunc from dlt.common.typing import DictStrAny, REPattern from dlt.common.validation import TCustomValidator, validate_dict -from dlt.common.schema.typing import SIMPLE_REGEX_PREFIX, TColumnName, TSimpleRegex, TStoredSchema, TTable, TTableColumns, TColumnBase, TColumn, TColumnProp, TDataType, THintType +from dlt.common.schema import detections +from dlt.common.schema.typing import SIMPLE_REGEX_PREFIX, TColumnName, TNormalizersConfig, TSimpleRegex, TStoredSchema, TTable, TTableColumns, TColumnBase, TColumn, TColumnProp, TDataType, THintType, TTypeDetectionFunc, TTypeDetections from dlt.common.schema.exceptions import ParentTableNotFoundException, SchemaEngineNoUpgradePathException @@ -130,19 +131,14 @@ def upgrade_engine_version(schema_dict: DictStrAny, from_engine: int, to_engine: # current version of the schema current = cast(TStoredSchema, schema_dict) # add default normalizers and root hash propagation - current["normalizers"] = { - "names": "dlt.common.normalizers.names.snake_case", - "json": { - "module": "dlt.common.normalizers.json.relational", - "config": { + current["normalizers"] = default_normalizers() + current["normalizers"]["json"]["config"] = { "propagation": { "root": { "_dlt_id": "_dlt_root_id" } } } - } - } # move settings, convert strings to simple regexes d_h: Dict[THintType, List[TSimpleRegex]] = schema_dict.pop("hints", {}) for h_k, h_l in d_h.items(): @@ -218,6 +214,17 @@ def add_missing_hints(column: TColumnBase) -> TColumn: } +def autodetect_sc_type(detection_fs: Sequence[TTypeDetections], t: Type[Any], v: Any) -> TDataType: + if detection_fs: + for detection_fn in detection_fs: + # the method must exist in the module + detection_f: TTypeDetectionFunc = getattr(detections, "is_" + detection_fn) + dt = detection_f(t, v) + if dt is not None: + return dt + return None + + def py_type_to_sc_type(t: Type[Any]) -> TDataType: if t is float: return "double" @@ -396,5 +403,15 @@ def load_table() -> TTable: } +def default_normalizers() -> TNormalizersConfig: + return { + "detections": ["timestamp", "iso_timestamp"], + "names": "dlt.common.normalizers.names.snake_case", + "json": { + "module": "dlt.common.normalizers.json.relational" + } + } + + def standard_hints() -> Dict[THintType, List[TSimpleRegex]]: return None diff --git a/dlt/common/storages/loader_storage.py b/dlt/common/storages/loader_storage.py index b77a26b119..184bbef74e 100644 --- a/dlt/common/storages/loader_storage.py +++ b/dlt/common/storages/loader_storage.py @@ -56,7 +56,7 @@ def create_temp_load_folder(self, load_id: str) -> None: def write_temp_loading_file(self, load_id: str, table_name: str, table: TTableColumns, file_id: str, rows: Sequence[StrAny]) -> str: file_name = self.build_loading_file_name(load_id, table_name, file_id) - with self.storage.open(file_name, mode="w") as f: + with self.storage.open_file(file_name, mode="w") as f: if self.writer_type == "jsonl": write_jsonl(f, rows) elif self.writer_type == "insert_values": @@ -64,7 +64,7 @@ def write_temp_loading_file(self, load_id: str, table_name: str, table: TTableCo return Path(file_name).name def save_schema_updates(self, load_id: str, schema_updates: Sequence[TSchemaUpdate]) -> None: - with self.storage.open(f"{load_id}/{LoaderStorage.LOAD_SCHEMA_UPDATE_FILE_NAME}", mode="w") as f: + with self.storage.open_file(f"{load_id}/{LoaderStorage.LOAD_SCHEMA_UPDATE_FILE_NAME}", mode="w") as f: json.dump(schema_updates, f) def commit_temp_load_folder(self, load_id: str) -> None: diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 7b1a851a90..976bfdace4 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -11,9 +11,9 @@ T = TypeVar("T") -def chunks(list: Sequence[T], n: int) -> Iterator[Sequence[T]]: - for i in range(0, len(list), n): - yield list[i:i + n] +def chunks(seq: Sequence[T], n: int) -> Iterator[Sequence[T]]: + for i in range(0, len(seq), n): + yield seq[i:i + n] def uniq_id() -> str: @@ -103,8 +103,8 @@ def tuplify_list_of_dicts(dicts: Sequence[DictStrAny]) -> Sequence[DictStrAny]: return dicts -def filter_env_vars(vars: List[str]) -> StrStr: - return {k.lower(): environ[k] for k in vars if k in environ} +def filter_env_vars(envs: List[str]) -> StrStr: + return {k.lower(): environ[k] for k in envs if k in environ} def update_dict_with_prune(dest: DictStrAny, update: StrAny) -> None: diff --git a/dlt/common/validation.py b/dlt/common/validation.py index 5c97c153d2..3c6cec9ad9 100644 --- a/dlt/common/validation.py +++ b/dlt/common/validation.py @@ -8,16 +8,16 @@ TCustomValidator = Callable[[str, str, Any, Any], bool] -def validate_dict(schema: Type[_TypedDict], doc: StrAny, path: str, filter: TFilterFuc = None, validator: TCustomValidator = None) -> None: +def validate_dict(schema: Type[_TypedDict], doc: StrAny, path: str, filter_f: TFilterFuc = None, validator_f: TCustomValidator = None) -> None: # pass through filter - filter = filter or (lambda _: True) + filter_f = filter_f or (lambda _: True) # cannot validate anything - validator = validator or (lambda p, pk, pv, t: False) + validator_f = validator_f or (lambda p, pk, pv, t: False) allowed_props = get_type_hints(schema) required_props = {k: v for k, v in allowed_props.items() if not is_optional_type(v)} # remove optional props - props = {k: v for k, v in doc.items() if filter(k)} + props = {k: v for k, v in doc.items() if filter_f(k)} # check missing props missing = set(required_props.keys()).difference(props.keys()) if len(missing): @@ -41,7 +41,7 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None: elif is_typeddict(t): if not isinstance(pv, dict): raise DictValidationException(f"In {path}: field {pk} value {pv} has invalid type {type(pv).__name__} while dict is expected", path, pk, pv) - validate_dict(t, pv, path + "/" + pk, filter, validator) + validate_dict(t, pv, path + "/" + pk, filter_f, validator_f) elif is_list_generic_type(t): if not isinstance(pv, list): raise DictValidationException(f"In {path}: field {pk} value {pv} has invalid type {type(pv).__name__} while list is expected", path, pk, pv) @@ -62,7 +62,7 @@ def verify_prop(pk: str, pv: Any, t: Any) -> None: # pass everything with any type pass else: - if not validator(path, pk, pv, t): + if not validator_f(path, pk, pv, t): raise DictValidationException(f"In {path}: field {pk} has expected type {t.__name__} which lacks validator", path, pk) # check allowed props diff --git a/dlt/dbt_runner/runner.py b/dlt/dbt_runner/runner.py index c03c8056d4..ec0a7a0fae 100644 --- a/dlt/dbt_runner/runner.py +++ b/dlt/dbt_runner/runner.py @@ -72,7 +72,7 @@ def run_dbt(command: str, command_args: Sequence[str] = None) -> Sequence[dbt_re profile_name=profile_name, command_args=command_args, global_args=global_args, - vars=dbt_package_vars + dbt_vars=dbt_package_vars ) diff --git a/dlt/dbt_runner/utils.py b/dlt/dbt_runner/utils.py index 559492eeee..2d426bd727 100644 --- a/dlt/dbt_runner/utils.py +++ b/dlt/dbt_runner/utils.py @@ -99,14 +99,14 @@ def is_incremental_schema_out_of_sync_error(error: dbt_results.RunResult) -> boo def run_dbt_command(package_path: str, command: str, profiles_dir: str, profile_name: Optional[str] = None, - global_args: Sequence[str] = None, command_args: Sequence[str] = None, vars: StrAny = None) -> Sequence[dbt_results.BaseResult]: + global_args: Sequence[str] = None, command_args: Sequence[str] = None, dbt_vars: StrAny = None) -> Sequence[dbt_results.BaseResult]: args = ["--profiles-dir", profiles_dir] # add profile name if provided if profile_name: args += ["--profile", profile_name] # serialize dbt variables to pass to package - if vars: - args += ["--vars", json.dumps(vars)] + if dbt_vars: + args += ["--vars", json.dumps(dbt_vars)] if command_args: args += command_args diff --git a/dlt/unpacker/unpacker.py b/dlt/unpacker/unpacker.py index 46709158ea..320fd736da 100644 --- a/dlt/unpacker/unpacker.py +++ b/dlt/unpacker/unpacker.py @@ -85,7 +85,7 @@ def w_unpack_files(schema_name: str, load_id: str, events_files: Sequence[str]) for events_file in events_files: try: logger.debug(f"Processing events file {events_file} in load_id {load_id} with file_id {file_id}") - with unpack_storage.storage.open(events_file) as f: + with unpack_storage.storage.open_file(events_file) as f: events: Sequence[TEvent] = json.load(f) for event in events: for (table_name, parent_table), row in schema.normalize_json(schema, event, load_id): diff --git a/examples/quickstart.py b/examples/quickstart.py index f84b924588..c8d7445c7c 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -1,4 +1,5 @@ import base64 +from dlt.common import json from dlt.common.utils import uniq_id from dlt.pipeline import Pipeline, GCPPipelineCredentials @@ -18,7 +19,7 @@ gcp_credentials_json["private_key"] = bytes([_a ^ _b for _a, _b in zip(base64.b64decode(gcp_credentials_json["private_key"]), b"quickstart-sv"*150)]).decode("utf-8") # if you re-use an edited schema, then uncomment this part, so you can save it to file -# schema_file_path = "examples/schemas/quickstart.yml" +schema_file_path = "examples/schemas/quickstart.yml" # 2. Create a pipeline @@ -33,6 +34,8 @@ # 3. Pass the data to the pipeline and give it a table name. Optionally unpack and handle schema. +# with open("experiments/metabase_data.json", "r") as f: +# rows = json.load(f) rows = [{"name": "Ana", "age": 30, "id": 456, "children": [{"name": "Bill", "id": 625}, {"name": "Elli", "id": 591} ]}, @@ -47,14 +50,15 @@ pipeline.unpack() # If you want to save the schema to curate it and re-use it, uncomment the below -# schema = pipeline.get_default_schema() -# schema_yaml = schema.as_yaml() +schema = pipeline.get_default_schema() +schema_yaml = schema.as_yaml(remove_defaults=True) +print(schema_yaml) # f = open(data_schema_file_path, "a", encoding="utf-8") # f.write(schema_yaml) # f.close() # 4. Load -pipeline.load() +# pipeline.load() # 5. Optional error handling - print, raise or handle. diff --git a/examples/sources/singer_tap.py b/examples/sources/singer_tap.py index 01730db386..e80d565ec2 100644 --- a/examples/sources/singer_tap.py +++ b/examples/sources/singer_tap.py @@ -14,7 +14,7 @@ class SingerMessage(TypedDict): - type: str + type: str # noqa: A003 class SingerRecord(SingerMessage): diff --git a/tests/common/normalizers/test_json_relational.py b/tests/common/normalizers/test_json_relational.py index aefa189389..35f32f4b53 100644 --- a/tests/common/normalizers/test_json_relational.py +++ b/tests/common/normalizers/test_json_relational.py @@ -28,7 +28,7 @@ def test_flatten_fix_field_name(schema: Schema) -> None: } } } - flattened_row, lists = _flatten(schema, "mock_table", row) + flattened_row, lists = _flatten(schema, "mock_table", row, 0) assert "f_1" in flattened_row # assert "f_2" in flattened_row assert "f_3__f4" in flattened_row @@ -55,13 +55,13 @@ def test_preserve_complex_value(schema: Schema) -> None: row_1 = { "value": 1 } - flattened_row, _ = _flatten(schema, "with_complex", row_1) + flattened_row, _ = _flatten(schema, "with_complex", row_1, 0) assert flattened_row["value"] == 1 row_2 = { "value": {"complex": True} } - flattened_row, _ = _flatten(schema, "with_complex", row_2) + flattened_row, _ = _flatten(schema, "with_complex", row_2, 0) assert flattened_row["value"] == row_2["value"] # complex value is not flattened assert "value__complex" not in flattened_row @@ -75,13 +75,13 @@ def test_preserve_complex_value_with_hint(schema: Schema) -> None: row_1 = { "value": 1 } - flattened_row, _ = _flatten(schema, "any_table", row_1) + flattened_row, _ = _flatten(schema, "any_table", row_1, 0) assert flattened_row["value"] == 1 row_2 = { "value": {"complex": True} } - flattened_row, _ = _flatten(schema, "any_table", row_2) + flattened_row, _ = _flatten(schema, "any_table", row_2, 0) assert flattened_row["value"] == row_2["value"] # complex value is not flattened assert "value__complex" not in flattened_row diff --git a/tests/common/storages/test_file_storage.py b/tests/common/storages/test_file_storage.py index ef4a13c094..781b35d8dd 100644 --- a/tests/common/storages/test_file_storage.py +++ b/tests/common/storages/test_file_storage.py @@ -17,14 +17,14 @@ def test_save_atomic_encode() -> None: tstr = "data'ऄअआइ''ईउऊऋऌऍऎए');" FileStorage.save_atomic(TEST_STORAGE, "file.txt", tstr) storage = FileStorage(TEST_STORAGE) - with storage.open("file.txt") as f: + with storage.open_file("file.txt") as f: assert f.encoding == "utf-8" assert f.read() == tstr bstr = b"axa\0x0\0x0" FileStorage.save_atomic(TEST_STORAGE, "file.bin", bstr, file_type="b") storage = FileStorage(TEST_STORAGE, file_type="b") - with storage.open("file.bin", mode="r") as f: + with storage.open_file("file.bin", mode="r") as f: assert hasattr(f, "encoding") is False assert f.read() == bstr diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index 240acddaeb..5d24fd0086 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -14,5 +14,4 @@ def test_flatten_list_of_str_or_dicts() -> None: def test_digest128_length() -> None: - print(digest128("hash it")) assert len(digest128("hash it")) == 120/6 diff --git a/tests/dbt_runner/test_utils.py b/tests/dbt_runner/test_utils.py index 8135c1706b..1e849246d6 100644 --- a/tests/dbt_runner/test_utils.py +++ b/tests/dbt_runner/test_utils.py @@ -95,29 +95,29 @@ def test_dbt_commands(root_storage: FileStorage) -> None: # run deps, results are None assert run_dbt_command(repo_path, "deps", ".", global_args=global_args) is None # profiles in cases require this var to be set - vars = {"dbt_schema": "JM_EKS"} + dbt_vars = {"dbt_schema": "JM_EKS"} # run list, results are string - results = run_dbt_command(repo_path, "list", ".", global_args=global_args, vars=vars) + results = run_dbt_command(repo_path, "list", ".", global_args=global_args, dbt_vars=dbt_vars) assert len(results) == 28 assert "jaffle_shop.not_null_orders_amount" in results # run list for specific selector - results = run_dbt_command(repo_path, "list", ".", global_args=global_args, command_args=["-s", "jaffle_shop.not_null_orders_amount"], vars=vars) + results = run_dbt_command(repo_path, "list", ".", global_args=global_args, command_args=["-s", "jaffle_shop.not_null_orders_amount"], dbt_vars=dbt_vars) assert len(results) == 1 assert results[0] == "jaffle_shop.not_null_orders_amount" # run debug, that will fail with pytest.raises(DBTProcessingError) as dbt_err: - run_dbt_command(repo_path, "debug", ".", global_args=global_args, vars=vars) + run_dbt_command(repo_path, "debug", ".", global_args=global_args, dbt_vars=dbt_vars) # results are bool assert dbt_err.value.command == "debug" # we have no database connectivity so tests will fail with pytest.raises(DBTProcessingError) as dbt_err: - run_dbt_command(repo_path, "test", ".", global_args=global_args, vars=vars) + run_dbt_command(repo_path, "test", ".", global_args=global_args, dbt_vars=dbt_vars) # in that case test results are bool, not list of tests runs assert dbt_err.value.command == "test" # same for run with pytest.raises(DBTProcessingError) as dbt_err: - run_dbt_command(repo_path, "run", ".", global_args=global_args, vars=vars, command_args=["--fail-fast", "--full-refresh"]) + run_dbt_command(repo_path, "run", ".", global_args=global_args, dbt_vars=dbt_vars, command_args=["--fail-fast", "--full-refresh"]) # in that case test results are bool, not list of tests runs assert dbt_err.value.command == "run" diff --git a/tests/test_validation.py b/tests/test_validation.py index 5634e0ce2d..0174f30f5d 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -225,4 +225,4 @@ def test_optional(test_doc: TTestRecord) -> None: def test_filter(test_doc: TTestRecord) -> None: test_doc["x-extra"] = "x-annotation" # remove x-extra with a filter - validate_dict(TTestRecord, test_doc, ".", filter=lambda k: k != "x-extra") + validate_dict(TTestRecord, test_doc, ".", filter_f=lambda k: k != "x-extra") diff --git a/tests/unpacker/test_unpacker.py b/tests/unpacker/test_unpacker.py index 285caf331f..c2536bec6a 100644 --- a/tests/unpacker/test_unpacker.py +++ b/tests/unpacker/test_unpacker.py @@ -261,7 +261,7 @@ def expect_load_package(load_id: str, expected_tables: Sequence[str]) -> Dict[st def expect_lines_file(load_file: str, line: int = 0) -> str: - with unpacker.load_storage.storage.open(load_file) as f: + with unpacker.load_storage.storage.open_file(load_file) as f: lines = f.readlines() return lines[line], len(lines) diff --git a/tests/utils.py b/tests/utils.py index 66b3ca8e13..8ab90358fc 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -83,5 +83,5 @@ def create_schema_with_name(schema_name) -> Schema: return schema -def assert_no_dict_key_starts_with(dict: StrAny, key_prefix: str) -> None: - assert all(not key.startswith(key_prefix) for key in dict.keys()) +def assert_no_dict_key_starts_with(d: StrAny, key_prefix: str) -> None: + assert all(not key.startswith(key_prefix) for key in d.keys()) From f6d55d0c78cd54be2641b33e0432a55f98282790 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 29 Jun 2022 19:26:49 +0200 Subject: [PATCH 6/6] bumps to version 0.1.0rc6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5849a7ef19..6bd8387be8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "python-dlt" -version = "0.1.0rc5" +version = "0.1.0rc6" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["ScaleVector "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ",]