diff --git a/dlt/cli/init_command.py b/dlt/cli/init_command.py index 9d7172f548..6f5e561c7f 100644 --- a/dlt/cli/init_command.py +++ b/dlt/cli/init_command.py @@ -11,13 +11,13 @@ from dlt.common.configuration.providers import CONFIG_TOML, SECRETS_TOML, ConfigTomlProvider, SecretsTomlProvider from dlt.common.normalizers import default_normalizers, import_normalizers from dlt.common.pipeline import get_dlt_repos_dir +from dlt.common.source import _SOURCES from dlt.version import DLT_PKG_NAME, __version__ from dlt.common.destination.reference import DestinationReference from dlt.common.reflection.utils import rewrite_python_script from dlt.common.schema.exceptions import InvalidSchemaName from dlt.common.storages.file_storage import FileStorage -from dlt.extract.decorators import _SOURCES import dlt.reflection.names as n from dlt.reflection.script_inspector import inspect_pipeline_script, load_script_module diff --git a/dlt/cli/pipeline_command.py b/dlt/cli/pipeline_command.py index 19724fa60c..0d74565fdd 100644 --- a/dlt/cli/pipeline_command.py +++ b/dlt/cli/pipeline_command.py @@ -4,7 +4,7 @@ from dlt.cli.exceptions import CliCommandException from dlt.common import json -from dlt.common.pipeline import _resource_state, get_dlt_pipelines_dir, TSourceState +from dlt.common.pipeline import resource_state, get_dlt_pipelines_dir, TSourceState from dlt.common.destination.reference import TDestinationReferenceArg from dlt.common.runners import Venv from dlt.common.runners.stdout import iter_stdout @@ -97,8 +97,8 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver if sources_state: source_state = next(iter(sources_state.items()))[1] if is_single_schema else sources_state.get(schema_name) if source_state: - resource_state = _resource_state(resource_name, source_state) - res_state_slots = len(resource_state) + resource_state_ = resource_state(resource_name, source_state) + res_state_slots = len(resource_state_) fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots)))) fmt.echo() fmt.echo("Working dir content:") diff --git a/dlt/cli/source_detection.py b/dlt/cli/source_detection.py index da3b4f1163..369663b82f 100644 --- a/dlt/cli/source_detection.py +++ b/dlt/cli/source_detection.py @@ -7,10 +7,10 @@ from dlt.common.configuration.specs import BaseConfiguration from dlt.common.reflection.utils import creates_func_def_name_node from dlt.common.typing import is_optional_type +from dlt.common.source import SourceInfo from dlt.cli.config_toml_writer import WritableConfigValue from dlt.cli.exceptions import CliCommandException -from dlt.extract.decorators import SourceInfo from dlt.reflection.script_visitor import PipelineScriptVisitor diff --git a/dlt/common/configuration/accessors.py b/dlt/common/configuration/accessors.py index 47b7f055cf..cf71db7030 100644 --- a/dlt/common/configuration/accessors.py +++ b/dlt/common/configuration/accessors.py @@ -26,6 +26,11 @@ def __getitem__(self, field: str) -> Any: else: return value + def __setitem__(self, field: str, value: Any) -> None: + sections = field.split(".") + key = sections.pop() + self.writable_provider.set_value(key, value, None, *sections) + def get(self, field: str, expected_type: Type[TConfigAny] = None) -> TConfigAny: value: TConfigAny value, _ = self._get_value(field, expected_type) @@ -47,6 +52,11 @@ def config_providers(self) -> Sequence[ConfigProvider]: def default_type(self) -> AnyType: pass + @property + @abc.abstractmethod + def writable_provider(self) -> ConfigProvider: + pass + def _get_providers_from_context(self) -> Sequence[ConfigProvider]: return Container()[ConfigProvidersContext].providers @@ -87,6 +97,11 @@ def config_providers(self) -> Sequence[ConfigProvider]: def default_type(self) -> AnyType: return AnyType + @property + def writable_provider(self) -> ConfigProvider: + """find first writable provider that does not support secrets - should be config.toml""" + return next(p for p in self._get_providers_from_context() if p.is_writable and not p.supports_secrets) + value: ClassVar[None] = ConfigValue "A placeholder that tells dlt to replace it with actual config value during the call to a source or resource decorated function." @@ -103,6 +118,11 @@ def config_providers(self) -> Sequence[ConfigProvider]: def default_type(self) -> AnyType: return TSecretValue + @property + def writable_provider(self) -> ConfigProvider: + """find first writable provider that supports secrets - should be secrets.toml""" + return next(p for p in self._get_providers_from_context() if p.is_writable and p.supports_secrets) + value: ClassVar[None] = ConfigValue "A placeholder that tells dlt to replace it with actual secret during the call to a source or resource decorated function." diff --git a/dlt/common/configuration/providers/google_secrets.py b/dlt/common/configuration/providers/google_secrets.py index 9a85ee7f1d..02ec76f026 100644 --- a/dlt/common/configuration/providers/google_secrets.py +++ b/dlt/common/configuration/providers/google_secrets.py @@ -138,36 +138,7 @@ def _update_from_vault(self, full_key: str, key: str, hint: type, pipeline_name: secret = self._look_vault(full_key, hint) self._vault_lookups[full_key] = pendulum.now() if secret is not None: - self._add_node(secret, key, pipeline_name, sections) - - def _add_node(self, secret: str, key: str, pipeline_name: str, sections: Tuple[str, ...]) -> None: - if pipeline_name: - sections = (pipeline_name, ) + sections - - doc: Any = auto_cast(secret) - if isinstance(doc, TOMLContainer): - if key is None: - self._toml = doc - else: - # always update the top document - update_dict_nested(self._toml, doc) - else: - if key is None: - raise ValueError("dlt_secrets_toml must contain toml document") - - master: TOMLContainer - # descend from root, create tables if necessary - master = self._toml - for k in sections: - if not isinstance(master, dict): - raise KeyError(k) - if k not in master: - master[k] = tomlkit.table() - master = master[k] # type: ignore - if isinstance(doc, dict): - update_dict_nested(master[key], doc) # type: ignore - else: - master[key] = doc + self.set_value(key, auto_cast(secret), pipeline_name, *sections) @property def is_empty(self) -> bool: diff --git a/dlt/common/configuration/providers/provider.py b/dlt/common/configuration/providers/provider.py index 420cfaf5e9..c6bfea5dc3 100644 --- a/dlt/common/configuration/providers/provider.py +++ b/dlt/common/configuration/providers/provider.py @@ -10,6 +10,9 @@ class ConfigProvider(abc.ABC): def get_value(self, key: str, hint: Type[Any], pipeline_name: str, *sections: str) -> Tuple[Optional[Any], str]: pass + def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) -> None: + raise NotImplementedError() + @property @abc.abstractmethod def supports_secrets(self) -> bool: @@ -29,6 +32,9 @@ def name(self) -> str: def is_empty(self) -> bool: return False + @property + def is_writable(self) -> bool: + return False def get_key_name(key: str, separator: str, /, *sections: str) -> str: diff --git a/dlt/common/configuration/providers/toml.py b/dlt/common/configuration/providers/toml.py index 7efd98e7e6..fceb7e433a 100644 --- a/dlt/common/configuration/providers/toml.py +++ b/dlt/common/configuration/providers/toml.py @@ -36,6 +36,35 @@ def get_value(self, key: str, hint: Type[Any], pipeline_name: str, *sections: st except KeyError: return None, full_key + def set_value(self, key: str, value: Any, pipeline_name: str, *sections: str) -> None: + if pipeline_name: + sections = (pipeline_name, ) + sections + + if isinstance(value, TOMLContainer): + if key is None: + self._toml = value + else: + # always update the top document + # TODO: verify that value contains only the elements under key + update_dict_nested(self._toml, value) + else: + if key is None: + raise ValueError("dlt_secrets_toml must contain toml document") + + master: TOMLContainer + # descend from root, create tables if necessary + master = self._toml + for k in sections: + if not isinstance(master, dict): + raise KeyError(k) + if k not in master: + master[k] = tomlkit.table() + master = master[k] # type: ignore + if isinstance(value, dict) and isinstance(master.get(key), dict): + update_dict_nested(master[key], value) # type: ignore + else: + master[key] = value + @property def supports_sections(self) -> bool: return True @@ -110,6 +139,9 @@ def name(self) -> str: def supports_secrets(self) -> bool: return False + @property + def is_writable(self) -> bool: + return True class SecretsTomlProvider(TomlFileProvider): @@ -125,6 +157,10 @@ def name(self) -> str: def supports_secrets(self) -> bool: return True + @property + def is_writable(self) -> bool: + return True + class TomlProviderReadException(ConfigProviderException): def __init__(self, provider_name: str, file_name: str, full_path: str, toml_exception: str) -> None: diff --git a/dlt/common/configuration/specs/__init__.py b/dlt/common/configuration/specs/__init__.py index f23950dec7..635b3434d8 100644 --- a/dlt/common/configuration/specs/__init__.py +++ b/dlt/common/configuration/specs/__init__.py @@ -1,8 +1,5 @@ from .run_configuration import RunConfiguration # noqa: F401 from .base_configuration import BaseConfiguration, CredentialsConfiguration, CredentialsWithDefault, ContainerInjectableContext, extract_inner_hint, is_base_configuration_inner_hint, configspec # noqa: F401 -from .normalize_volume_configuration import NormalizeVolumeConfiguration # noqa: F401 -from .load_volume_configuration import LoadVolumeConfiguration # noqa: F401 -from .schema_volume_configuration import SchemaVolumeConfiguration, TSchemaFileFormat # noqa: F401 from .config_section_context import ConfigSectionContext # noqa: F401 from .gcp_credentials import GcpServiceAccountCredentialsWithoutDefaults, GcpServiceAccountCredentials, GcpOAuthCredentialsWithoutDefaults, GcpOAuthCredentials, GcpCredentials # noqa: F401 diff --git a/dlt/common/configuration/specs/connection_string_credentials.py b/dlt/common/configuration/specs/connection_string_credentials.py index 828e4a5449..386535122b 100644 --- a/dlt/common/configuration/specs/connection_string_credentials.py +++ b/dlt/common/configuration/specs/connection_string_credentials.py @@ -16,7 +16,7 @@ class ConnectionStringCredentials(CredentialsConfiguration): port: Optional[int] = None query: Optional[Dict[str, str]] = None - __config_gen_annotations__: ClassVar[List[str]] = ["port"] + __config_gen_annotations__: ClassVar[List[str]] = ["port", "password", "host"] def parse_native_representation(self, native_value: Any) -> None: if not isinstance(native_value, str): diff --git a/dlt/common/configuration/specs/load_volume_configuration.py b/dlt/common/configuration/specs/load_volume_configuration.py deleted file mode 100644 index c014a66d43..0000000000 --- a/dlt/common/configuration/specs/load_volume_configuration.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import TYPE_CHECKING - -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec - - -@configspec(init=True) -class LoadVolumeConfiguration(BaseConfiguration): - load_volume_path: str = None # path to volume where files to be loaded to analytical storage are stored - delete_completed_jobs: bool = False # if set to true the folder with completed jobs will be deleted - - if TYPE_CHECKING: - def __init__(self, load_volume_path: str = None, delete_completed_jobs: bool = None) -> None: - ... diff --git a/dlt/common/configuration/specs/normalize_volume_configuration.py b/dlt/common/configuration/specs/normalize_volume_configuration.py deleted file mode 100644 index 49aa40df40..0000000000 --- a/dlt/common/configuration/specs/normalize_volume_configuration.py +++ /dev/null @@ -1,12 +0,0 @@ -from typing import TYPE_CHECKING - -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec - - -@configspec(init=True) -class NormalizeVolumeConfiguration(BaseConfiguration): - normalize_volume_path: str = None # path to volume where normalized loader files will be stored - - if TYPE_CHECKING: - def __init__(self, normalize_volume_path: str = None) -> None: - ... diff --git a/dlt/common/configuration/specs/schema_volume_configuration.py b/dlt/common/configuration/specs/schema_volume_configuration.py deleted file mode 100644 index 4d02bde973..0000000000 --- a/dlt/common/configuration/specs/schema_volume_configuration.py +++ /dev/null @@ -1,19 +0,0 @@ -from typing import Optional, Literal, TYPE_CHECKING, get_args - -from dlt.common.configuration.specs.base_configuration import BaseConfiguration, configspec - -TSchemaFileFormat = Literal["json", "yaml"] -SchemaFileExtensions = get_args(TSchemaFileFormat) - - -@configspec(init=True) -class SchemaVolumeConfiguration(BaseConfiguration): - schema_volume_path: str = None # path to volume with default schemas - import_schema_path: Optional[str] = None # import schema from external location - export_schema_path: Optional[str] = None # export schema to external location - external_schema_format: TSchemaFileFormat = "yaml" # format in which to expect external schema - external_schema_format_remove_defaults: bool = True # remove default values when exporting schema - - if TYPE_CHECKING: - def __init__(self, schema_volume_path: str = None, import_schema_path: str = None, export_schema_path: str = None) -> None: - ... diff --git a/dlt/common/exceptions.py b/dlt/common/exceptions.py index a7f6c5d2b8..2834d1aae7 100644 --- a/dlt/common/exceptions.py +++ b/dlt/common/exceptions.py @@ -154,6 +154,12 @@ def __init__(self, source_name: Optional[str] = None) -> None: super().__init__(None, msg) +class ResourceNameNotAvailable(PipelineException): + def __init__(self) -> None: + super().__init__(None, + "A resource state was requested but no resource marked callable was found in the call stack. Resource state may be only requested from @dlt.resource decorated function or with explicit resource name.") + + class SourceSectionNotAvailable(PipelineException): def __init__(self) -> None: msg = "Access to state was requested without source section active. State should be requested from within the @dlt.source and @dlt.resource decorated function." diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py index d90f1bf5ef..4fc170a0d0 100644 --- a/dlt/common/pipeline.py +++ b/dlt/common/pipeline.py @@ -1,10 +1,11 @@ import os import datetime # noqa: 251 import humanize +import inspect import contextlib from typing import Any, Callable, ClassVar, Dict, List, NamedTuple, Optional, Protocol, Sequence, TYPE_CHECKING, Tuple, TypedDict -from dlt.common import pendulum +from dlt.common import pendulum, logger from dlt.common.configuration import configspec from dlt.common.configuration import known_sections from dlt.common.configuration.container import Container @@ -14,9 +15,10 @@ from dlt.common.configuration.paths import get_dlt_data_dir from dlt.common.configuration.specs import RunConfiguration from dlt.common.destination.reference import DestinationReference, TDestinationReferenceArg -from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable, SourceSectionNotAvailable +from dlt.common.exceptions import DestinationHasFailedJobs, PipelineStateNotAvailable, ResourceNameNotAvailable, SourceSectionNotAvailable from dlt.common.schema import Schema from dlt.common.schema.typing import TColumnKey, TColumnSchema, TWriteDisposition +from dlt.common.source import get_current_pipe_name from dlt.common.storages.load_storage import LoadPackageInfo from dlt.common.typing import DictStrAny, REPattern from dlt.common.jsonpath import delete_matches, TAnyJsonPath @@ -292,10 +294,11 @@ def sources_state(pipeline_state_: Optional[TPipelineState] = None, /) -> DictSt def source_state() -> DictStrAny: - """Returns a dictionary with the source state. Such state is preserved across pipeline runs and may be used to implement incremental loads. + """Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check + the `resource_state` function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads. ### Summary - The state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to. + The source state is a python dictionary-like object that is available within the `@dlt.source` and `@dlt.resource` decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state: * The source state is scoped to a section of the source. The source section is set by default to the module name in which source function is defined. @@ -303,27 +306,6 @@ def source_state() -> DictStrAny: * Any JSON-serializable values can be written and the read from the state. `dlt` dumps and restores instances of Python bytes, DateTime, Date and Decimal types. * The state available in the source decorated function is read only and any changes will be discarded. * The state available in the resource decorated function is writable and written values will be available on the next pipeline run - - ### Example - The most typical use case for the state is to implement incremental load. - >>> @dlt.resource(write_disposition="append") - >>> def players_games(chess_url, players, start_month=None, end_month=None): - >>> checked_archives = dlt.current.state().setdefault("archives", []) - >>> archives = players_archives(chess_url, players) - >>> for url in archives: - >>> if url in checked_archives: - >>> print(f"skipping archive {url}") - >>> continue - >>> else: - >>> print(f"getting archive {url}") - >>> checked_archives.append(url) - >>> # get the filtered archive - >>> r = requests.get(url) - >>> r.raise_for_status() - >>> yield r.json().get("games", []) - - Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). - Up to few thousand archives we should be good though. """ global _last_full_state @@ -359,11 +341,56 @@ def _delete_source_state_keys(key: TAnyJsonPath, source_state_: Optional[DictStr delete_matches(key, state_) -def _resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> DictStrAny: - """Alpha version of the resource state, the signature will change. - Returns resource-scoped state. +def resource_state(resource_name: str = None, source_state_: Optional[DictStrAny] = None, /) -> DictStrAny: + """Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. Dlt state is preserved across pipeline runs and may be used to implement incremental loads. + + Note that this function accepts the resource name as optional argument. There are rare cases when `dlt` is not able to resolve resource name due to requesting function + working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer. + + ### Summary + The resource state is a python dictionary-like object that is available within the `@dlt.resource` decorated functions and may be read and written to. + The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. + When using the state: + * The resource state is scoped to a particular resource requesting it. + * Any JSON-serializable values can be written and the read from the state. `dlt` dumps and restores instances of Python bytes, DateTime, Date and Decimal types. + * The state available in the resource decorated function is writable and written values will be available on the next pipeline run + + ### Example + The most typical use case for the state is to implement incremental load. + >>> @dlt.resource(write_disposition="append") + >>> def players_games(chess_url, players, start_month=None, end_month=None): + >>> checked_archives = dlt.current.resource_state().setdefault("archives", []) + >>> archives = players_archives(chess_url, players) + >>> for url in archives: + >>> if url in checked_archives: + >>> print(f"skipping archive {url}") + >>> continue + >>> else: + >>> print(f"getting archive {url}") + >>> checked_archives.append(url) + >>> # get the filtered archive + >>> r = requests.get(url) + >>> r.raise_for_status() + >>> yield r.json().get("games", []) + + Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). + Up to few thousand archives we should be good though. + Args: + resource_name (str, optional): forces to use state for a resource with this name. Defaults to None. + source_state_ (Optional[DictStrAny], optional): Alternative source state. Defaults to None. + + Raises: + ResourceNameNotAvailable: Raise if used outside of resource context or from a different thread than main + + Returns: + DictStrAny: State dictionary """ state_ = source_state() if source_state_ is None else source_state_ + # backtrace to find the shallowest resource + if not resource_name: + resource_name = get_current_pipe_name() + if not resource_name: + raise ResourceNameNotAvailable() return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore diff --git a/dlt/common/runners/configuration.py b/dlt/common/runners/configuration.py index 80dd1948a9..124c19c1b1 100644 --- a/dlt/common/runners/configuration.py +++ b/dlt/common/runners/configuration.py @@ -1,4 +1,4 @@ -from typing import Literal, Optional +from typing import Literal, Optional, TYPE_CHECKING from dlt.common.configuration import configspec from dlt.common.configuration.specs import BaseConfiguration @@ -6,8 +6,16 @@ TPoolType = Literal["process", "thread", "none"] -@configspec +@configspec(init=True) class PoolRunnerConfiguration(BaseConfiguration): pool_type: TPoolType = None # type of pool to run, must be set in derived configs workers: Optional[int] = None # how many threads/processes in the pool run_sleep: float = 0.1 # how long to sleep between runs with workload, seconds + + if TYPE_CHECKING: + def __init__( + self, + pool_type: TPoolType = None, + workers: int = None + ) -> None: + ... diff --git a/dlt/common/source.py b/dlt/common/source.py new file mode 100644 index 0000000000..a75c2dd948 --- /dev/null +++ b/dlt/common/source.py @@ -0,0 +1,47 @@ +import threading +from types import ModuleType +from typing import Dict, NamedTuple, Optional, Type + +from dlt.common.configuration.specs import BaseConfiguration +from dlt.common.exceptions import ResourceNameNotAvailable +from dlt.common.typing import AnyFun +from dlt.common.utils import get_callable_name + + +class SourceInfo(NamedTuple): + """Runtime information on the source/resource""" + SPEC: Type[BaseConfiguration] + f: AnyFun + module: ModuleType + + +_SOURCES: Dict[str, SourceInfo] = {} +"""A registry of all the decorated sources and resources discovered when importing modules""" + +_CURRENT_PIPE_NAME: Dict[int, str] = {} +"""Name of currently executing pipe per thread id set during execution of a gen in pipe""" + + +def set_current_pipe_name(name: str) -> None: + """Set pipe name in current thread""" + _CURRENT_PIPE_NAME[threading.get_ident()] = name + + +def unset_current_pipe_name() -> None: + """Unset pipe name in current thread""" + _CURRENT_PIPE_NAME[threading.get_ident()] = None + + +def get_current_pipe_name() -> str: + """Gets pipe name associated with current thread""" + name = _CURRENT_PIPE_NAME.get(threading.get_ident()) + if name is None: + raise ResourceNameNotAvailable() + return name + + +def _get_source_for_inner_function(f: AnyFun) -> Optional[SourceInfo]: + # find source function + parts = get_callable_name(f, "__qualname__").split(".") + parent_fun = ".".join(parts[:-2]) + return _SOURCES.get(parent_fun) \ No newline at end of file diff --git a/dlt/common/storages/__init__.py b/dlt/common/storages/__init__.py index 68d8c4aea4..7b4260e9d5 100644 --- a/dlt/common/storages/__init__.py +++ b/dlt/common/storages/__init__.py @@ -1,7 +1,8 @@ from .file_storage import FileStorage # noqa: F401 +from .versioned_storage import VersionedStorage # noqa: F401 from .schema_storage import SchemaStorage # noqa: F401 from .live_schema_storage import LiveSchemaStorage # noqa: F401 from .normalize_storage import NormalizeStorage # noqa: F401 -from .versioned_storage import VersionedStorage # noqa: F401 from .load_storage import LoadStorage # noqa: F401 from .data_item_storage import DataItemStorage # noqa: F401 +from .configuration import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration, TSchemaFileFormat # noqa: F401 diff --git a/dlt/common/storages/configuration.py b/dlt/common/storages/configuration.py new file mode 100644 index 0000000000..78c8d5951d --- /dev/null +++ b/dlt/common/storages/configuration.py @@ -0,0 +1,38 @@ +from typing import TYPE_CHECKING, Literal, Optional, get_args + +from dlt.common.configuration.specs import BaseConfiguration, configspec + +TSchemaFileFormat = Literal["json", "yaml"] +SchemaFileExtensions = get_args(TSchemaFileFormat) + + +@configspec(init=True) +class SchemaStorageConfiguration(BaseConfiguration): + schema_volume_path: str = None # path to volume with default schemas + import_schema_path: Optional[str] = None # import schema from external location + export_schema_path: Optional[str] = None # export schema to external location + external_schema_format: TSchemaFileFormat = "yaml" # format in which to expect external schema + external_schema_format_remove_defaults: bool = True # remove default values when exporting schema + + if TYPE_CHECKING: + def __init__(self, schema_volume_path: str = None, import_schema_path: str = None, export_schema_path: str = None) -> None: + ... + + +@configspec(init=True) +class NormalizeStorageConfiguration(BaseConfiguration): + normalize_volume_path: str = None # path to volume where normalized loader files will be stored + + if TYPE_CHECKING: + def __init__(self, normalize_volume_path: str = None) -> None: + ... + + +@configspec(init=True) +class LoadStorageConfiguration(BaseConfiguration): + load_volume_path: str = None # path to volume where files to be loaded to analytical storage are stored + delete_completed_jobs: bool = False # if set to true the folder with completed jobs will be deleted + + if TYPE_CHECKING: + def __init__(self, load_volume_path: str = None, delete_completed_jobs: bool = None) -> None: + ... diff --git a/dlt/common/storages/live_schema_storage.py b/dlt/common/storages/live_schema_storage.py index ff7570af2b..c482d5e7ea 100644 --- a/dlt/common/storages/live_schema_storage.py +++ b/dlt/common/storages/live_schema_storage.py @@ -1,14 +1,14 @@ from typing import Dict from dlt.common.schema.schema import Schema -from dlt.common.storages.schema_storage import SchemaStorage -from dlt.common.configuration.specs import SchemaVolumeConfiguration from dlt.common.configuration.accessors import config +from dlt.common.storages.schema_storage import SchemaStorage +from dlt.common.storages.configuration import SchemaStorageConfiguration class LiveSchemaStorage(SchemaStorage): - def __init__(self, config: SchemaVolumeConfiguration = config.value, makedirs: bool = False) -> None: + def __init__(self, config: SchemaStorageConfiguration = config.value, makedirs: bool = False) -> None: self.live_schemas: Dict[str, Schema] = {} super().__init__(config, makedirs) diff --git a/dlt/common/storages/load_storage.py b/dlt/common/storages/load_storage.py index 9577120c30..141507e92f 100644 --- a/dlt/common/storages/load_storage.py +++ b/dlt/common/storages/load_storage.py @@ -14,10 +14,10 @@ from dlt.common.typing import DictStrAny, StrAny from dlt.common.storages.file_storage import FileStorage from dlt.common.data_writers import TLoaderFileFormat, DataWriter -from dlt.common.configuration.specs import LoadVolumeConfiguration from dlt.common.configuration.accessors import config from dlt.common.exceptions import TerminalValueError from dlt.common.schema import Schema, TSchemaTables, TTableSchemaColumns +from dlt.common.storages.configuration import LoadStorageConfiguration from dlt.common.storages.versioned_storage import VersionedStorage from dlt.common.storages.data_item_storage import DataItemStorage from dlt.common.storages.exceptions import JobWithUnsupportedWriterException, LoadPackageNotFound @@ -140,13 +140,13 @@ class LoadStorage(DataItemStorage, VersionedStorage): ALL_SUPPORTED_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat)) - @with_config(spec=LoadVolumeConfiguration, sections=(known_sections.LOAD,)) + @with_config(spec=LoadStorageConfiguration, sections=(known_sections.LOAD,)) def __init__( self, is_owner: bool, preferred_file_format: TLoaderFileFormat, supported_file_formats: Iterable[TLoaderFileFormat], - config: LoadVolumeConfiguration = config.value + config: LoadStorageConfiguration = config.value ) -> None: if not LoadStorage.ALL_SUPPORTED_FILE_FORMATS.issuperset(supported_file_formats): raise TerminalValueError(supported_file_formats) diff --git a/dlt/common/storages/normalize_storage.py b/dlt/common/storages/normalize_storage.py index c15c761fc6..45f541f5ec 100644 --- a/dlt/common/storages/normalize_storage.py +++ b/dlt/common/storages/normalize_storage.py @@ -1,13 +1,12 @@ -from typing import ClassVar, Sequence, NamedTuple, overload +from typing import ClassVar, Sequence, NamedTuple from itertools import groupby from pathlib import Path -from dlt.common.storages.file_storage import FileStorage from dlt.common.configuration import with_config, known_sections -from dlt.common.configuration.specs import NormalizeVolumeConfiguration -from dlt.common.storages.versioned_storage import VersionedStorage from dlt.common.configuration.accessors import config - +from dlt.common.storages.file_storage import FileStorage +from dlt.common.storages.configuration import NormalizeStorageConfiguration +from dlt.common.storages.versioned_storage import VersionedStorage class TParsedNormalizeFileName(NamedTuple): schema_name: str @@ -20,8 +19,8 @@ class NormalizeStorage(VersionedStorage): STORAGE_VERSION: ClassVar[str] = "1.0.0" EXTRACTED_FOLDER: ClassVar[str] = "extracted" # folder within the volume where extracted files to be normalized are stored - @with_config(spec=NormalizeVolumeConfiguration, sections=(known_sections.NORMALIZE,)) - def __init__(self, is_owner: bool, config: NormalizeVolumeConfiguration = config.value) -> None: + @with_config(spec=NormalizeStorageConfiguration, sections=(known_sections.NORMALIZE,)) + def __init__(self, is_owner: bool, config: NormalizeStorageConfiguration = config.value) -> None: super().__init__(NormalizeStorage.STORAGE_VERSION, is_owner, FileStorage(config.normalize_volume_path, "t", makedirs=is_owner)) self.config = config if is_owner: diff --git a/dlt/common/storages/schema_storage.py b/dlt/common/storages/schema_storage.py index 069dc5367e..59647f826b 100644 --- a/dlt/common/storages/schema_storage.py +++ b/dlt/common/storages/schema_storage.py @@ -1,11 +1,10 @@ import yaml -from typing import Iterator, List, Mapping, Tuple, overload +from typing import Iterator, List, Mapping, Tuple from dlt.common import json, logger from dlt.common.configuration import with_config -from dlt.common.configuration.specs import SchemaVolumeConfiguration, TSchemaFileFormat -from dlt.common.configuration.specs.schema_volume_configuration import SchemaFileExtensions from dlt.common.configuration.accessors import config +from dlt.common.storages.configuration import SchemaStorageConfiguration, TSchemaFileFormat, SchemaFileExtensions from dlt.common.storages.file_storage import FileStorage from dlt.common.schema import Schema, verify_schema_hash from dlt.common.typing import DictStrAny @@ -18,8 +17,8 @@ class SchemaStorage(Mapping[str, Schema]): SCHEMA_FILE_NAME = "schema.%s" NAMED_SCHEMA_FILE_PATTERN = f"%s.{SCHEMA_FILE_NAME}" - @with_config(spec=SchemaVolumeConfiguration, sections=("schema",)) - def __init__(self, config: SchemaVolumeConfiguration = config.value, makedirs: bool = False) -> None: + @with_config(spec=SchemaStorageConfiguration, sections=("schema",)) + def __init__(self, config: SchemaStorageConfiguration = config.value, makedirs: bool = False) -> None: self.config = config self.storage = FileStorage(config.schema_volume_path, makedirs=makedirs) diff --git a/dlt/common/utils.py b/dlt/common/utils.py index 632bd9e939..deec79d94c 100644 --- a/dlt/common/utils.py +++ b/dlt/common/utils.py @@ -355,7 +355,7 @@ def reveal_pseudo_secret(obfuscated_secret: str, pseudo_key: bytes) -> str: def get_module_name(m: ModuleType) -> str: """Gets module name from module with a fallback for executing module __main__""" - if m.__name__ == "__main__": + if m.__name__ == "__main__" and hasattr(m, "__file__"): module_file = os.path.basename(m.__file__) module_name, _ = os.path.splitext(module_file) return module_name diff --git a/dlt/extract/decorators.py b/dlt/extract/decorators.py index 11da6af0d0..9d680ce17d 100644 --- a/dlt/extract/decorators.py +++ b/dlt/extract/decorators.py @@ -2,7 +2,7 @@ import inspect from types import ModuleType from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, Iterator, List, NamedTuple, Optional, Tuple, Type, TypeVar, Union, cast, overload +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Iterator, List, Optional, Tuple, Type, TypeVar, Union, cast, overload from dlt.common.configuration import with_config, get_fun_spec, known_sections, configspec from dlt.common.configuration.container import Container @@ -12,8 +12,9 @@ from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.exceptions import ArgumentsOverloadException from dlt.common.pipeline import PipelineContext +from dlt.common.source import _SOURCES, SourceInfo from dlt.common.schema.schema import Schema -from dlt.common.schema.typing import TColumnKey, TColumnName, TTableSchemaColumns, TWriteDisposition +from dlt.common.schema.typing import TColumnKey, TTableSchemaColumns, TWriteDisposition from dlt.common.storages.exceptions import SchemaNotFoundError from dlt.common.storages.schema_storage import SchemaStorage from dlt.common.typing import AnyFun, ParamSpec, Concatenate, TDataItem, TDataItems @@ -37,17 +38,6 @@ class SourceSchemaInjectableContext(ContainerInjectableContext): def __init__(self, schema: Schema = None) -> None: ... - -class SourceInfo(NamedTuple): - """Runtime information on the source/resource""" - SPEC: Type[BaseConfiguration] - f: AnyFun - module: ModuleType - - -_SOURCES: Dict[str, SourceInfo] = {} -"""A registry of all the decorated sources and resources discovered when importing modules""" - TSourceFunParams = ParamSpec("TSourceFunParams") TResourceFunParams = ParamSpec("TResourceFunParams") @@ -224,8 +214,6 @@ def resource( ) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ... - - @overload def resource( data: Union[List[Any], Tuple[Any], Iterator[Any]], @@ -370,12 +358,50 @@ def decorator(f: Callable[TResourceFunParams, Any]) -> Callable[TResourceFunPara return decorator(data) else: # take name from the generator + source_section: str = None if inspect.isgenerator(data): name = name or get_callable_name(data) # type: ignore - return make_resource(name, None, data) + func_module = inspect.getmodule(data.gi_frame) + source_section = _get_source_section_name(func_module) + + return make_resource(name, source_section, data) +@overload +def transformer( + f: None = ..., + /, + data_from: TUnboundDltResource = DltResource.Empty, + name: str = None, + table_name: TTableHintTemplate[str] = None, + write_disposition: TTableHintTemplate[TWriteDisposition] = None, + columns: TTableHintTemplate[TTableSchemaColumns] = None, + primary_key: TTableHintTemplate[TColumnKey] = None, + merge_key: TTableHintTemplate[TColumnKey] = None, + selected: bool = True, + spec: Type[BaseConfiguration] = None +) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]: + ... + +@overload def transformer( + f: Callable[Concatenate[TDataItem, TResourceFunParams], Any], + /, + data_from: TUnboundDltResource = DltResource.Empty, + name: str = None, + table_name: TTableHintTemplate[str] = None, + write_disposition: TTableHintTemplate[TWriteDisposition] = None, + columns: TTableHintTemplate[TTableSchemaColumns] = None, + primary_key: TTableHintTemplate[TColumnKey] = None, + merge_key: TTableHintTemplate[TColumnKey] = None, + selected: bool = True, + spec: Type[BaseConfiguration] = None +) -> Callable[TResourceFunParams, DltResource]: + ... + +def transformer( # type: ignore + f: Optional[Callable[Concatenate[TDataItem, TResourceFunParams], Any]] = None, + /, data_from: TUnboundDltResource = DltResource.Empty, name: str = None, table_name: TTableHintTemplate[str] = None, @@ -386,7 +412,14 @@ def transformer( selected: bool = True, spec: Type[BaseConfiguration] = None ) -> Callable[[Callable[Concatenate[TDataItem, TResourceFunParams], Any]], Callable[TResourceFunParams, DltResource]]: - """A form of `dlt resource` that takes input from other resources in order to enrich or transform the data. + """A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data. + + The decorated function `f` must take at least one argument of type TDataItems (a single item or list of items depending on the resource `data_from`). `dlt` will pass + metadata associated with the data item if argument with name `meta` is present. Otherwise, transformer function may take more arguments and be parametrized + like the resources. + + You can bind the transformer early by specifying resource in `data_from` when the transformer is created or create dynamic bindings later with | operator + which is demonstrated in example below: ### Example >>> @dlt.resource @@ -395,20 +428,45 @@ def transformer( >>> yield r.json()["players"] # returns list of player names >>> >>> # this resource takes data from players and returns profiles - >>> @dlt.transformer(data_from=players, write_disposition="replace") + >>> @dlt.transformer(write_disposition="replace") >>> def player_profile(player: Any) -> Iterator[TDataItems]: >>> r = requests.get(f"{chess_url}player/{player}") >>> r.raise_for_status() >>> yield r.json() >>> - >>> list(players("GM") | player_profile) # pipes the data from players into player profile to produce a list of player profiles + >>> # pipes the data from players into player profile to produce a list of player profiles + >>> list(players("GM") | player_profile) + + ### Args: + f: (Callable): a function taking minimum one argument of TDataItems type which will receive data yielded from `data_from` resource. + + data_from (Callable | Any, optional): a resource that will send data to the decorated function `f` + + name (str, optional): A name of the resource that by default also becomes the name of the table to which the data is loaded. + If not present, the name of the decorated function will be used. + + table_name (TTableHintTemplate[str], optional): An table name, if different from `name`. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + write_disposition (Literal["skip", "append", "replace", "merge"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + primary_key (str | Sequence[str]): A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + merge_key (str | Sequence[str]): A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day. + This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. + + selected (bool, optional): When `True` `dlt pipeline` will extract and load this resource, if `False`, the resource will be ignored. + + spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source. """ - f: AnyFun = None - # if data_from is a function we are called without parens - if inspect.isfunction(data_from): - f = data_from - data_from = DltResource.Empty + if isinstance(f, DltResource): + raise ValueError("Please pass `data_from=` argument as keyword argument. The only positional argument to transformer is the decorated function") + return resource( # type: ignore f, name=name, @@ -435,14 +493,9 @@ def _maybe_load_schema_for_callable(f: AnyFun, name: str) -> Optional[Schema]: return None -def _get_source_for_inner_function(f: AnyFun) -> Optional[SourceInfo]: - # find source function - parts = get_callable_name(f, "__qualname__").split(".") - parent_fun = ".".join(parts[:-2]) - return _SOURCES.get(parent_fun) - - def _get_source_section_name(m: ModuleType) -> str: + if m is None: + return None if hasattr(m, "__source_name__"): return cast(str, m.__source_name__) return get_module_name(m) @@ -456,27 +509,6 @@ def get_source_schema() -> Schema: raise SourceSchemaNotAvailable() -# def with_retry(max_retries: int = 3, retry_sleep: float = 1.0) -> Callable[[Callable[_TFunParams, TBoundItem]], Callable[_TFunParams, TBoundItem]]: - -# def decorator(f: Callable[_TFunParams, TBoundItem]) -> Callable[_TFunParams, TBoundItem]: - -# def _wrap(*args: Any, **kwargs: Any) -> TBoundItem: -# attempts = 0 -# while True: -# try: -# return f(*args, **kwargs) -# except Exception as exc: -# if attempts == max_retries: -# raise -# attempts += 1 -# logger.warning(f"Exception {exc} in iterator, retrying {attempts} / {max_retries}") -# sleep(retry_sleep) - -# return _wrap - -# return decorator - - TBoundItems = TypeVar("TBoundItems", bound=TDataItems) TDeferred = Callable[[], TBoundItems] TDeferredFunParams = ParamSpec("TDeferredFunParams") diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index 6ee07174f5..4eae011926 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -12,8 +12,8 @@ from dlt.common.utils import uniq_id from dlt.common.typing import TDataItems, TDataItem from dlt.common.schema import Schema, utils, TSchemaUpdate -from dlt.common.storages import NormalizeStorage, DataItemStorage -from dlt.common.configuration.specs import NormalizeVolumeConfiguration, known_sections +from dlt.common.storages import NormalizeStorageConfiguration, NormalizeStorage, DataItemStorage +from dlt.common.configuration.specs import known_sections from dlt.extract.decorators import SourceSchemaInjectableContext from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints @@ -25,7 +25,7 @@ class ExtractorStorage(DataItemStorage, NormalizeStorage): EXTRACT_FOLDER: ClassVar[str] = "extract" - def __init__(self, C: NormalizeVolumeConfiguration) -> None: + def __init__(self, C: NormalizeStorageConfiguration) -> None: # data item storage with jsonl with pua encoding super().__init__("puae-jsonl", True, C) self.storage.create_folder(ExtractorStorage.EXTRACT_FOLDER, exists_ok=True) diff --git a/dlt/extract/incremental.py b/dlt/extract/incremental.py index a3d682e734..ea71ca2327 100644 --- a/dlt/extract/incremental.py +++ b/dlt/extract/incremental.py @@ -10,7 +10,7 @@ from dlt.common.schema.typing import TColumnKey from dlt.common.configuration import configspec from dlt.common.configuration.specs import BaseConfiguration -from dlt.common.pipeline import _resource_state +from dlt.common.pipeline import resource_state from dlt.common.utils import digest128 from dlt.extract.exceptions import IncrementalUnboundError, PipeException from dlt.extract.pipe import Pipe @@ -145,7 +145,7 @@ def get_state(self) -> IncrementalColumnState: @staticmethod def _get_state(resource_name: str, cursor_path: str) -> IncrementalColumnState: - state: IncrementalColumnState = _resource_state(resource_name).setdefault('incremental', {}).setdefault(cursor_path, {}) + state: IncrementalColumnState = resource_state(resource_name).setdefault('incremental', {}).setdefault(cursor_path, {}) # if state params is empty return state diff --git a/dlt/extract/pipe.py b/dlt/extract/pipe.py index e1045abf9b..61dda07b39 100644 --- a/dlt/extract/pipe.py +++ b/dlt/extract/pipe.py @@ -13,6 +13,7 @@ from dlt.common.configuration.inject import with_config from dlt.common.configuration.specs import BaseConfiguration from dlt.common.exceptions import PipelineException +from dlt.common.source import unset_current_pipe_name, set_current_pipe_name from dlt.common.typing import AnyFun, AnyType, TDataItems from dlt.common.utils import get_callable_name @@ -551,14 +552,15 @@ def __next__(self) -> PipeItem: # advance to next step step = pipe_item.pipe[pipe_item.step + 1] - assert callable(step) try: + set_current_pipe_name(pipe_item.pipe.name) next_meta = pipe_item.meta next_item = step(item, meta=pipe_item.meta) # type: ignore if isinstance(next_item, DataItemWithMeta): next_meta = next_item.meta next_item = next_item.data except TypeError as ty_ex: + assert callable(step) raise InvalidStepFunctionArguments(pipe_item.pipe.name, get_callable_name(step), inspect.signature(step), str(ty_ex)) except (PipelineException, ExtractorException, DltSourceException, PipeException): raise @@ -571,6 +573,9 @@ def __next__(self) -> PipeItem: pipe_item = None def close(self) -> None: + # unregister the pipe name right after execution of gen stopped + unset_current_pipe_name() + def stop_background_loop(loop: asyncio.AbstractEventLoop) -> None: loop.stop() @@ -667,8 +672,8 @@ def _get_source_item(self) -> ResolvablePipeItem: # get items from last added iterator, this makes the overall Pipe as close to FIFO as possible gen, step, pipe, meta = self._sources[-1] # print(f"got {pipe.name} {pipe._pipe_id}") - # TODO: count items coming of the generator and stop the generator if reached. that allows for sampling the beginning of a stream - # _counts(id(gen)).setdefault(0) + 1 + # register current pipe name during the execution of gen + set_current_pipe_name(pipe.name) try: item = next(gen) # full pipe item may be returned, this is used by ForkPipe step diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 0f5387c3da..ded935c757 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -12,7 +12,7 @@ from dlt.common.schema.typing import TColumnName from dlt.common.typing import AnyFun, StrAny, TDataItem, TDataItems, NoneType from dlt.common.configuration.container import Container -from dlt.common.pipeline import PipelineContext, StateInjectableContext, SupportsPipelineRun, _resource_state, source_state, pipeline_state +from dlt.common.pipeline import PipelineContext, StateInjectableContext, SupportsPipelineRun, resource_state, source_state, pipeline_state from dlt.common.utils import flatten_list_or_items, get_callable_name, multi_context_manager, uniq_id from dlt.extract.typing import DataItemWithMeta, ItemTransformFunc, ItemTransformFunctionWithMeta, TableNameMeta, FilterItem, MapItem, YieldMapItem @@ -296,7 +296,7 @@ def bind(self, *args: Any, **kwargs: Any) -> "DltResource": def state(self) -> StrAny: """Gets resource-scoped state from the existing pipeline context. If pipeline context is not available, PipelineStateNotAvailable is raised""" with self._get_config_section_context(): - return _resource_state(self.name) + return resource_state(self.name) def __call__(self, *args: Any, **kwargs: Any) -> "DltResource": """Binds the parametrized resources to passed arguments. Creates and returns a bound resource. Generators and iterators are not evaluated.""" @@ -336,7 +336,7 @@ def _get_context() -> List[ContextManager[Any]]: # managed pipe iterator will remove injected contexts when closing with multi_context_manager(_get_context()): - pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipe(self._pipe) # type: ignore + pipe_iterator: ManagedPipeIterator = ManagedPipeIterator.from_pipes([self._pipe]) # type: ignore pipe_iterator.set_context_manager(multi_context_manager(_get_context())) _iter = map(lambda item: item.item, pipe_iterator) @@ -346,7 +346,9 @@ def _get_config_section_context(self) -> ContextManager[ConfigSectionContext]: container = Container() proxy = container[PipelineContext] pipeline_name = None if not proxy.is_active() else proxy.pipeline().pipeline_name - return inject_section(ConfigSectionContext(pipeline_name=pipeline_name, sections=(known_sections.SOURCES, self.section or pipeline_name or uniq_id(), self._name))) + return inject_section( + ConfigSectionContext(pipeline_name=pipeline_name, sections=(known_sections.SOURCES, self.section or pipeline_name or uniq_id(), self._name)) + ) def __str__(self) -> str: info = f"DltResource {self._name}" @@ -511,6 +513,7 @@ class DltSource(Iterable[TDataItem]): * It implements `Iterable` interface so you can get all the data from the resources yourself and without dlt pipeline present. * You can get the `schema` for the source and all the resources within it. * You can use a `run` method to load the data with a default instance of dlt pipeline. + * You can get source read only state for the currently active Pipeline instance """ def __init__(self, name: str, section: str, schema: Schema, resources: Sequence[DltResource] = None) -> None: self.name = name @@ -602,9 +605,10 @@ def discover_schema(self, item: TDataItem = None) -> Schema: return schema def with_resources(self, *resource_names: str) -> "DltSource": - """A convenience method to select one of more resources to be loaded. Returns a source with the specified resources selected.""" - self._resources.select(*resource_names) - return self + """A convenience method to select one of more resources to be loaded. Returns a clone of the original source with the specified resources selected.""" + source = self.clone() + source._resources.select(*resource_names) + return source def add_limit(self, max_items: int) -> "DltSource": # noqa: A003 @@ -634,6 +638,11 @@ def state(self) -> StrAny: with self._get_config_section_context(): return source_state() + def clone(self) -> "DltSource": + """Creates a deep copy of the source where copies of schema, resources and pipes are created""" + # mind that resources and pipes are cloned when added to the DltResourcesDict in the source constructor + return DltSource(self.name, self.section, self.schema.clone(), list(self._resources.values())) + def __iter__(self) -> Iterator[TDataItem]: """Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class. diff --git a/dlt/load/configuration.py b/dlt/load/configuration.py index bead44cb6c..34f24780bb 100644 --- a/dlt/load/configuration.py +++ b/dlt/load/configuration.py @@ -1,7 +1,7 @@ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from dlt.common.configuration import configspec -from dlt.common.configuration.specs import LoadVolumeConfiguration +from dlt.common.storages import LoadStorageConfiguration from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType @@ -14,14 +14,14 @@ class LoaderConfiguration(PoolRunnerConfiguration): """when True, raises on terminally failed jobs immediately""" raise_on_max_retries: int = 5 """When gt 0 will raise when job reaches raise_on_max_retries""" - _load_storage_config: LoadVolumeConfiguration = None + _load_storage_config: LoadStorageConfiguration = None if TYPE_CHECKING: def __init__( self, - pool_type: TPoolType = None, + pool_type: TPoolType = "thread", workers: int = None, raise_on_failed_jobs: bool = False, - _load_storage_config: LoadVolumeConfiguration = None + _load_storage_config: LoadStorageConfiguration = None ) -> None: ... diff --git a/dlt/normalize/configuration.py b/dlt/normalize/configuration.py index f31372efac..d4191d2cf9 100644 --- a/dlt/normalize/configuration.py +++ b/dlt/normalize/configuration.py @@ -1,26 +1,26 @@ from typing import TYPE_CHECKING from dlt.common.configuration import configspec -from dlt.common.configuration.specs import LoadVolumeConfiguration, NormalizeVolumeConfiguration, SchemaVolumeConfiguration from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.runners.configuration import PoolRunnerConfiguration, TPoolType +from dlt.common.storages import LoadStorageConfiguration, NormalizeStorageConfiguration, SchemaStorageConfiguration @configspec(init=True) class NormalizeConfiguration(PoolRunnerConfiguration): pool_type: TPoolType = "process" destination_capabilities: DestinationCapabilitiesContext = None # injectable - _schema_storage_config: SchemaVolumeConfiguration - _normalize_storage_config: NormalizeVolumeConfiguration - _load_storage_config: LoadVolumeConfiguration + _schema_storage_config: SchemaStorageConfiguration + _normalize_storage_config: NormalizeStorageConfiguration + _load_storage_config: LoadStorageConfiguration if TYPE_CHECKING: def __init__( self, - pool_type: TPoolType = None, + pool_type: TPoolType = "process", workers: int = None, - _schema_storage_config: SchemaVolumeConfiguration = None, - _normalize_storage_config: NormalizeVolumeConfiguration = None, - _load_storage_config: LoadVolumeConfiguration = None + _schema_storage_config: SchemaStorageConfiguration = None, + _normalize_storage_config: NormalizeStorageConfiguration = None, + _load_storage_config: LoadStorageConfiguration = None ) -> None: ... diff --git a/dlt/normalize/normalize.py b/dlt/normalize/normalize.py index 3d2510c5c8..1d4f25a43c 100644 --- a/dlt/normalize/normalize.py +++ b/dlt/normalize/normalize.py @@ -6,7 +6,6 @@ from dlt.common.configuration import with_config, known_sections from dlt.common.configuration.accessors import config from dlt.common.configuration.container import Container -from dlt.common.configuration.specs import LoadVolumeConfiguration, NormalizeVolumeConfiguration from dlt.common.destination import DestinationCapabilitiesContext, TLoaderFileFormat from dlt.common.json import custom_pua_decode from dlt.common.runners import TRunMetrics, Runnable @@ -15,7 +14,7 @@ from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns from dlt.common.schema.utils import merge_schema_updates from dlt.common.storages.exceptions import SchemaNotFoundError -from dlt.common.storages import NormalizeStorage, SchemaStorage, LoadStorage +from dlt.common.storages import NormalizeStorage, SchemaStorage, LoadStorage, LoadStorageConfiguration, NormalizeStorageConfiguration from dlt.common.typing import TDataItem from dlt.common.schema import TSchemaUpdate, Schema from dlt.common.schema.exceptions import CannotCoerceColumnException @@ -66,8 +65,8 @@ def load_or_create_schema(schema_storage: SchemaStorage, schema_name: str) -> Sc @staticmethod def w_normalize_files( - normalize_storage_config: NormalizeVolumeConfiguration, - loader_storage_config: LoadVolumeConfiguration, + normalize_storage_config: NormalizeStorageConfiguration, + loader_storage_config: LoadStorageConfiguration, destination_caps: DestinationCapabilitiesContext, stored_schema: TStoredSchema, load_id: str, diff --git a/dlt/pipeline/current.py b/dlt/pipeline/current.py index 1becbd1ff4..f915a30932 100644 --- a/dlt/pipeline/current.py +++ b/dlt/pipeline/current.py @@ -1,6 +1,6 @@ """Easy access to active pipelines, state, sources and schemas""" -from dlt.common.pipeline import source_state as _state +from dlt.common.pipeline import source_state as _state, resource_state from dlt.pipeline import pipeline as _pipeline from dlt.extract.decorators import get_source_schema diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 1280059401..67992b82c0 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -9,7 +9,7 @@ from dlt import version from dlt.common import json, logger, pendulum from dlt.common.configuration import inject_section, known_sections -from dlt.common.configuration.specs import RunConfiguration, NormalizeVolumeConfiguration, SchemaVolumeConfiguration, LoadVolumeConfiguration, CredentialsConfiguration +from dlt.common.configuration.specs import RunConfiguration, CredentialsConfiguration from dlt.common.configuration.container import Container from dlt.common.configuration.exceptions import ConfigFieldMissingException, ContextDefaultCannotBeCreated from dlt.common.configuration.specs.config_section_context import ConfigSectionContext @@ -18,15 +18,14 @@ from dlt.common.runtime import signals, initialize_runtime from dlt.common.schema.exceptions import InvalidDatasetName from dlt.common.schema.typing import TColumnSchema, TSchemaTables, TWriteDisposition -from dlt.common.storages.load_storage import LoadJobInfo, LoadPackageInfo, LoadStorage +from dlt.common.storages.load_storage import LoadJobInfo, LoadPackageInfo from dlt.common.typing import TFun, TSecretValue from dlt.common.runners import pool_runner as runner -from dlt.common.storages import LiveSchemaStorage, NormalizeStorage, SchemaStorage +from dlt.common.storages import LiveSchemaStorage, NormalizeStorage, LoadStorage, SchemaStorage, FileStorage, NormalizeStorageConfiguration, SchemaStorageConfiguration, LoadStorageConfiguration from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import DestinationReference, JobClientBase, DestinationClientConfiguration, DestinationClientDwhConfiguration, TDestinationReferenceArg from dlt.common.pipeline import ExtractInfo, LoadInfo, NormalizeInfo, SupportsPipeline, TPipelineLocalState, TPipelineState, StateInjectableContext from dlt.common.schema import Schema -from dlt.common.storages.file_storage import FileStorage from dlt.common.utils import is_interactive from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -199,9 +198,9 @@ def __init__( self._pipeline_instance_id = self._create_pipeline_instance_id() self._pipeline_storage: FileStorage = None self._schema_storage: LiveSchemaStorage = None - self._schema_storage_config: SchemaVolumeConfiguration = None - self._normalize_storage_config: NormalizeVolumeConfiguration = None - self._load_storage_config: LoadVolumeConfiguration = None + self._schema_storage_config: SchemaStorageConfiguration = None + self._normalize_storage_config: NormalizeStorageConfiguration = None + self._load_storage_config: LoadStorageConfiguration = None self._trace: PipelineTrace = None self._last_trace: PipelineTrace = None self._state_restored: bool = False @@ -660,14 +659,14 @@ def _init_working_dir(self, pipeline_name: str, pipelines_dir: str) -> None: def _configure(self, import_schema_path: str, export_schema_path: str, must_attach_to_local_pipeline: bool) -> None: # create schema storage and folders - self._schema_storage_config = SchemaVolumeConfiguration( + self._schema_storage_config = SchemaStorageConfiguration( schema_volume_path=os.path.join(self.working_dir, "schemas"), import_schema_path=import_schema_path, export_schema_path=export_schema_path ) # create default configs - self._normalize_storage_config = NormalizeVolumeConfiguration(normalize_volume_path=os.path.join(self.working_dir, "normalize")) - self._load_storage_config = LoadVolumeConfiguration(load_volume_path=os.path.join(self.working_dir, "load"),) + self._normalize_storage_config = NormalizeStorageConfiguration(normalize_volume_path=os.path.join(self.working_dir, "normalize")) + self._load_storage_config = LoadStorageConfiguration(load_volume_path=os.path.join(self.working_dir, "load"),) # are we running again? has_state = self._pipeline_storage.has_file(Pipeline.STATE_FILE) diff --git a/docs/website/docs/running-in-production/running.md b/docs/website/docs/running-in-production/running.md index c1718e0bd5..3d2b3a3877 100644 --- a/docs/website/docs/running-in-production/running.md +++ b/docs/website/docs/running-in-production/running.md @@ -155,7 +155,7 @@ print(load_info.has_failed_jobs) load_info.raise_on_failed_jobs() ``` -You may also immediately abort the load package with `LoadClientJobFailed` (terminal exception) on a first failed job. Such package is immediately moved to completed but its load id is not added to the `_dlt_loads` table. The other jobs that are executed in parallel may or may not complete. The dlt state, if present, will not be visible to `dlt`. In other words: **you should know what you are doing**. Here's example `config.toml` to enable this option: +You may also abort the load package with `LoadClientJobFailed` (terminal exception) on a first failed job. Such package is immediately moved to completed but its load id is not added to the `_dlt_loads` table. All the jobs that were running in parallel are completed before raising. The dlt state, if present, will not be visible to `dlt`. Here's example `config.toml` to enable this option: ```toml # you should really load just one job at a time to get the deterministic behavior @@ -176,7 +176,7 @@ Before adding retry to pipeline steps, note how `run` method actually works: By default `dlt` does not retry any of the pipeline steps. This is left to the included helpers and the [tenacity](https://tenacity.readthedocs.io/en/latest/) library. Snippet below will retry the `load` stage with the `retry_load` strategy and defined back-off or re-raise exception for any other steps (`extract`, `normalize`) and for terminal exceptions. ```python -from tenacity import retry_if_exception, Retrying, retry +from tenacity import stop_after_attempt, retry_if_exception, Retrying, retry from dlt.common.runtime.slack import send_slack_message from dlt.pipeline.helpers import retry_load @@ -186,7 +186,7 @@ if __name__ == "__main__" : data = chess(['magnuscarlsen', 'rpragchess'], start_month="2022/11", end_month="2022/12") try: - for attempt in Retrying(wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception(retry_load(())), reraise=True): + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1.5, min=4, max=10), retry=retry_if_exception(retry_load(())), reraise=True): with attempt: load_info = p.run(data) send_slack_message(pipeline.runtime_config.slack_incoming_hook, "HOORAY 😄") @@ -202,7 +202,7 @@ You can also use `tenacity` to decorate functions. This example additionally ret if __name__ == "__main__" : pipeline = dlt.pipeline(pipeline_name="chess_pipeline", destination='duckdb', dataset_name="games_data") - @retry(wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True) + @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1.5, min=4, max=10), retry=retry_if_exception(retry_load(("extract", "load"))), reraise=True) def load(): data = chess(['magnuscarlsen','vincentkeymer', 'dommarajugukesh', 'rpragchess'], start_month="2022/11", end_month="2022/12") return pipeline.run(data) diff --git a/pyproject.toml b/pyproject.toml index d5d7baa66f..4dc7476820 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dlt" -version = "0.2.8a0" +version = "0.2.8" description = "DLT is an open-source python-native scalable data loading framework that does not require any devops efforts to run." authors = ["dltHub Inc. "] maintainers = [ "Marcin Rudolf ", "Adrian Brudaru ", "Ty Dunn "] diff --git a/tests/cli/test_init_command.py b/tests/cli/test_init_command.py index 0aaa1d695f..b44fc6b5d9 100644 --- a/tests/cli/test_init_command.py +++ b/tests/cli/test_init_command.py @@ -15,13 +15,13 @@ from dlt.common.configuration.providers import CONFIG_TOML, SECRETS_TOML, SecretsTomlProvider from dlt.common.runners import Venv from dlt.common.storages.file_storage import FileStorage +from dlt.common.source import _SOURCES from dlt.common.utils import set_working_dir from dlt.cli import init_command, echo from dlt.cli.init_command import PIPELINES_MODULE_NAME, utils as cli_utils, files_ops, _select_pipeline_files from dlt.cli.exceptions import CliCommandException -from dlt.extract.decorators import _SOURCES from dlt.reflection.script_visitor import PipelineScriptVisitor from dlt.reflection import names as n diff --git a/tests/cli/utils.py b/tests/cli/utils.py index 7e8eb0344b..ba684ca45c 100644 --- a/tests/cli/utils.py +++ b/tests/cli/utils.py @@ -5,12 +5,11 @@ from dlt.common import git from dlt.common.pipeline import get_dlt_repos_dir from dlt.common.storages.file_storage import FileStorage - +from dlt.common.source import _SOURCES from dlt.common.utils import set_working_dir, uniq_id from dlt.cli import echo from dlt.cli.init_command import DEFAULT_PIPELINES_REPO -from dlt.extract.decorators import _SOURCES from tests.utils import TEST_STORAGE_ROOT diff --git a/tests/common/configuration/test_accessors.py b/tests/common/configuration/test_accessors.py index 52d731bdc2..6c01f66d97 100644 --- a/tests/common/configuration/test_accessors.py +++ b/tests/common/configuration/test_accessors.py @@ -7,9 +7,11 @@ from dlt.common.configuration.exceptions import ConfigFieldMissingException from dlt.common.configuration.providers import EnvironProvider, ConfigTomlProvider, SecretsTomlProvider +from dlt.common.configuration.resolve import resolve_configuration from dlt.common.configuration.specs import GcpServiceAccountCredentialsWithoutDefaults, ConnectionStringCredentials from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.configuration.utils import get_resolved_traces, ResolvedValueTrace +from dlt.common.runners.configuration import PoolRunnerConfiguration from dlt.common.typing import AnyType, TSecretValue @@ -121,6 +123,33 @@ def test_getter_accessor_typed(toml_providers: ConfigProvidersContext, environme assert c.client_email == "loader@a7513.iam.gserviceaccount.com" +def test_setter(toml_providers: ConfigProvidersContext, environment: Any) -> None: + assert dlt.secrets.writable_provider.name == "secrets.toml" + assert dlt.config.writable_provider.name == "config.toml" + + dlt.config["new_key"] = "new_value" + assert dlt.config["new_key"] == "new_value" + # not visible through secrets now (config.toml not included) + with pytest.raises(KeyError): + assert dlt.secrets["new_key"] == "new_value" + + dlt.secrets["new_secret"] = TSecretValue("a_secret") + assert dlt.secrets["new_secret"] == "a_secret" + # now visible (config is in secrets) + assert dlt.config["new_secret"] == "a_secret" + + # add sections + dlt.secrets["pipeline.new.credentials"] = {"api_key": "skjo87a7nnAAaa"} + assert dlt.secrets["pipeline.new.credentials"] == {"api_key": "skjo87a7nnAAaa"} + # check the toml directly + assert dlt.secrets.writable_provider._toml["pipeline"]["new"]["credentials"] == {"api_key": "skjo87a7nnAAaa"} + + # mod the config and use it to resolve the configuration + dlt.config["pool"] = {"pool_type": "process", "workers": 21} + c = resolve_configuration(PoolRunnerConfiguration(), sections=("pool", )) + assert dict(c) == {"pool_type": "process", "workers": 21, 'run_sleep': 0.1} + + def test_secrets_separation(toml_providers: ConfigProvidersContext) -> None: # secrets are available both in config and secrets assert dlt.config.get("credentials") is not None diff --git a/tests/common/configuration/test_toml_provider.py b/tests/common/configuration/test_toml_provider.py index 1e67ba7032..a7ec871477 100644 --- a/tests/common/configuration/test_toml_provider.py +++ b/tests/common/configuration/test_toml_provider.py @@ -6,18 +6,19 @@ from unittest.mock import patch import dlt -from dlt.common import pendulum +from dlt.common import pendulum, Decimal from dlt.common.configuration import configspec, ConfigFieldMissingException, resolve from dlt.common.configuration.container import Container from dlt.common.configuration.inject import with_config from dlt.common.configuration.exceptions import LookupTrace -from dlt.common.configuration.providers.toml import SECRETS_TOML, CONFIG_TOML, SecretsTomlProvider, ConfigTomlProvider, TomlProviderReadException +from dlt.common.configuration.providers.toml import SECRETS_TOML, CONFIG_TOML, BaseTomlProvider, SecretsTomlProvider, ConfigTomlProvider, TomlProviderReadException from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext from dlt.common.configuration.specs import BaseConfiguration, GcpServiceAccountCredentialsWithoutDefaults, ConnectionStringCredentials +from dlt.common.runners.configuration import PoolRunnerConfiguration from dlt.common.typing import TSecretValue from tests.utils import preserve_environ -from tests.common.configuration.utils import WithCredentialsConfiguration, CoercionTestConfiguration, COERCIONS, SecretConfiguration, environment, toml_providers +from tests.common.configuration.utils import SecretCredentials, WithCredentialsConfiguration, CoercionTestConfiguration, COERCIONS, SecretConfiguration, environment, toml_providers @configspec @@ -236,3 +237,118 @@ def test_toml_global_config() -> None: # check if values from project exist secrets_project = SecretsTomlProvider(add_global_config=False) assert secrets._toml == secrets_project._toml + + +def test_write_value(toml_providers: ConfigProvidersContext) -> None: + provider: BaseTomlProvider + for provider in toml_providers.providers: + if not provider.is_writable: + continue + # set single key + provider.set_value("_new_key_bool", True, None) + assert provider.get_value("_new_key_bool", Any, None) == (True, "_new_key_bool") + provider.set_value("_new_key_literal", TSecretValue("literal"), None) + assert provider.get_value("_new_key_literal", Any, None) == ("literal", "_new_key_literal") + # this will create path of tables + provider.set_value("deep_int", 2137, "deep_pipeline", "deep", "deep", "deep", "deep") + assert provider._toml["deep_pipeline"]["deep"]["deep"]["deep"]["deep"]["deep_int"] == 2137 + assert provider.get_value("deep_int", Any, "deep_pipeline", "deep", "deep", "deep", "deep") == (2137, "deep_pipeline.deep.deep.deep.deep.deep_int") + # same without the pipeline + now = pendulum.now() + provider.set_value("deep_date", now, None, "deep", "deep", "deep", "deep") + assert provider.get_value("deep_date", Any, None, "deep", "deep", "deep", "deep") == (now, "deep.deep.deep.deep.deep_date") + # in existing path + provider.set_value("deep_list", [1, 2, 3], None, "deep", "deep", "deep") + assert provider.get_value("deep_list", Any, None, "deep", "deep", "deep") == ([1, 2, 3], "deep.deep.deep.deep_list") + # still there + assert provider.get_value("deep_date", Any, None, "deep", "deep", "deep", "deep") == (now, "deep.deep.deep.deep.deep_date") + # overwrite value + provider.set_value("deep_list", [1, 2, 3, 4], None, "deep", "deep", "deep") + assert provider.get_value("deep_list", Any, None, "deep", "deep", "deep") == ([1, 2, 3, 4], "deep.deep.deep.deep_list") + # invalid type + with pytest.raises(ValueError): + provider.set_value("deep_decimal", Decimal("1.2"), None, "deep", "deep", "deep", "deep") + + # write new dict to a new key + test_d1 = {"key": "top", "embed": {"inner": "bottom", "inner_2": True}} + provider.set_value("deep_dict", test_d1, None, "dict_test") + assert provider.get_value("deep_dict", Any, None, "dict_test") == (test_d1, "dict_test.deep_dict") + # write same dict over dict + provider.set_value("deep_dict", test_d1, None, "dict_test") + assert provider.get_value("deep_dict", Any, None, "dict_test") == (test_d1, "dict_test.deep_dict") + # get a fragment + assert provider.get_value("inner_2", Any, None, "dict_test", "deep_dict", "embed") == (True, "dict_test.deep_dict.embed.inner_2") + # write a dict over non dict + provider.set_value("deep_list", test_d1, None, "deep", "deep", "deep") + assert provider.get_value("deep_list", Any, None, "deep", "deep", "deep") == (test_d1, "deep.deep.deep.deep_list") + # merge dicts + test_d2 = {"key": "_top", "key2": "new2", "embed": {"inner": "_bottom", "inner_3": 2121}} + provider.set_value("deep_dict", test_d2, None, "dict_test") + test_m_d1_d2 = { + "key": "_top", + "embed": {"inner": "_bottom", "inner_2": True, "inner_3": 2121}, + "key2": "new2" + } + assert provider.get_value("deep_dict", Any, None, "dict_test") == (test_m_d1_d2, "dict_test.deep_dict") + # print(provider.get_value("deep_dict", Any, None, "dict_test")) + + # write configuration + pool = PoolRunnerConfiguration(pool_type="none", workers=10) + provider.set_value("runner_config", dict(pool), "new_pipeline") + # print(provider._toml["new_pipeline"]["runner_config"].as_string()) + assert provider._toml["new_pipeline"]["runner_config"] == dict(pool) + + # dict creates only shallow dict so embedded credentials will fail + creds = WithCredentialsConfiguration() + creds.credentials = SecretCredentials({"secret_value": "***** ***"}) + with pytest.raises(ValueError): + provider.set_value("written_creds", dict(creds), None) + + +def test_write_toml_value(toml_providers: ConfigProvidersContext) -> None: + provider: BaseTomlProvider + for provider in toml_providers.providers: + if not provider.is_writable: + continue + + new_doc = tomlkit.parse(""" +int_val=2232 + +[table] +inner_int_val=2121 + """) + + # key == None replaces the whole document + provider.set_value(None, new_doc, None) + assert provider._toml == new_doc + + # key != None merges documents + to_merge_doc = tomlkit.parse(""" +int_val=2137 + +[babble] +word1="do" +word2="you" + + """) + provider.set_value("", to_merge_doc, None) + merged_doc = tomlkit.parse(""" +int_val=2137 + +[babble] +word1="do" +word2="you" + +[table] +inner_int_val=2121 + + """) + assert provider._toml == merged_doc + + # currently we ignore the key when merging tomlkit + provider.set_value("level", to_merge_doc, None) + assert provider._toml == merged_doc + + # only toml accepted with empty key + with pytest.raises(ValueError): + provider.set_value(None, {}, None) diff --git a/tests/common/runners/test_runnable.py b/tests/common/runners/test_runnable.py index 42b9ad1479..9ba621d6fe 100644 --- a/tests/common/runners/test_runnable.py +++ b/tests/common/runners/test_runnable.py @@ -4,7 +4,7 @@ from multiprocessing.pool import Pool from multiprocessing.dummy import Pool as ThreadPool -from dlt.normalize.configuration import SchemaVolumeConfiguration +from dlt.normalize.configuration import SchemaStorageConfiguration from tests.common.runners.utils import _TestRunnableWorkerMethod, _TestRunnableWorker, ALL_METHODS, mp_method_auto @@ -67,7 +67,7 @@ def test_weak_pool_ref() -> None: def test_configuredworker() -> None: # call worker method with CONFIG values that should be restored into CONFIG type - config = SchemaVolumeConfiguration() + config = SchemaStorageConfiguration() config["import_schema_path"] = "test_schema_path" _worker_1(config, "PX1", par2="PX2") @@ -76,9 +76,9 @@ def test_configuredworker() -> None: p.starmap(_worker_1, [(config, "PX1", "PX2")]) -def _worker_1(CONFIG: SchemaVolumeConfiguration, par1: str, par2: str = "DEFAULT") -> None: +def _worker_1(CONFIG: SchemaStorageConfiguration, par1: str, par2: str = "DEFAULT") -> None: # a correct type was passed - assert type(CONFIG) is SchemaVolumeConfiguration + assert type(CONFIG) is SchemaStorageConfiguration # check if config values are restored assert CONFIG.import_schema_path == "test_schema_path" # check if other parameters are correctly diff --git a/tests/common/runners/test_runners.py b/tests/common/runners/test_runners.py index c8fbe247d3..9bf7928c07 100644 --- a/tests/common/runners/test_runners.py +++ b/tests/common/runners/test_runners.py @@ -14,19 +14,19 @@ from tests.utils import init_test_logging -@configspec +@configspec(init=True) class ModPoolRunnerConfiguration(PoolRunnerConfiguration): pipeline_name: str = "testrunners" pool_type: TPoolType = "none" run_sleep: float = 0.1 -@configspec +@configspec(init=True) class ProcessPoolConfiguration(ModPoolRunnerConfiguration): pool_type: TPoolType = "process" -@configspec +@configspec(init=True) class ThreadPoolConfiguration(ModPoolRunnerConfiguration): pool_type: TPoolType = "thread" diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 931be2a895..bcabe6f9d9 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -6,7 +6,7 @@ from dlt.common import pendulum from dlt.common.configuration import resolve_configuration from dlt.common.configuration.container import Container -from dlt.common.configuration.specs import SchemaVolumeConfiguration +from dlt.common.storages import SchemaStorageConfiguration from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.exceptions import DictValidationException from dlt.common.normalizers.naming import snake_case, direct @@ -27,7 +27,7 @@ @pytest.fixture def schema_storage() -> SchemaStorage: C = resolve_configuration( - SchemaVolumeConfiguration(), + SchemaStorageConfiguration(), explicit_value={ "import_schema_path": "tests/common/cases/schemas/rasa", "external_schema_format": "json" diff --git a/tests/common/storages/test_loader_storage.py b/tests/common/storages/test_loader_storage.py index 65e981714f..905e6cfcdb 100644 --- a/tests/common/storages/test_loader_storage.py +++ b/tests/common/storages/test_loader_storage.py @@ -7,7 +7,7 @@ from dlt.common.schema import Schema, TSchemaTables from dlt.common.storages.load_storage import LoadPackageInfo, LoadStorage, ParsedLoadJobFileName, TJobState from dlt.common.configuration import resolve_configuration -from dlt.common.configuration.specs import LoadVolumeConfiguration +from dlt.common.storages import LoadStorageConfiguration from dlt.common.storages.exceptions import LoadPackageNotFound, NoMigrationPathException from dlt.common.typing import StrAny from dlt.common.utils import uniq_id @@ -17,7 +17,7 @@ @pytest.fixture def storage() -> LoadStorage: - C = resolve_configuration(LoadVolumeConfiguration()) + C = resolve_configuration(LoadStorageConfiguration()) s = LoadStorage(True, "jsonl", LoadStorage.ALL_SUPPORTED_FILE_FORMATS, C) return s diff --git a/tests/common/storages/test_normalize_storage.py b/tests/common/storages/test_normalize_storage.py index 8deb472140..678e1e49fe 100644 --- a/tests/common/storages/test_normalize_storage.py +++ b/tests/common/storages/test_normalize_storage.py @@ -1,9 +1,8 @@ import pytest from dlt.common.utils import uniq_id -from dlt.common.storages import NormalizeStorage +from dlt.common.storages import NormalizeStorage, NormalizeStorageConfiguration from dlt.common.storages.exceptions import NoMigrationPathException -from dlt.common.configuration.specs import NormalizeVolumeConfiguration from dlt.common.storages.normalize_storage import TParsedNormalizeFileName from tests.utils import write_version, autouse_test_storage diff --git a/tests/common/storages/test_schema_storage.py b/tests/common/storages/test_schema_storage.py index a84d59016f..330400c049 100644 --- a/tests/common/storages/test_schema_storage.py +++ b/tests/common/storages/test_schema_storage.py @@ -7,9 +7,8 @@ from dlt.common.schema.schema import Schema from dlt.common.schema.typing import TStoredSchema from dlt.common.schema.utils import default_normalizers -from dlt.common.configuration.specs import SchemaVolumeConfiguration from dlt.common.storages.exceptions import InStorageSchemaModified, SchemaNotFoundError -from dlt.common.storages import SchemaStorage, LiveSchemaStorage, FileStorage +from dlt.common.storages import SchemaStorageConfiguration, SchemaStorage, LiveSchemaStorage, FileStorage from tests.utils import autouse_test_storage, TEST_STORAGE_ROOT from tests.common.utils import load_yml_case, yml_case_path, COMMON_TEST_CASES_PATH, IMPORTED_VERSION_HASH_ETH_V5 @@ -17,22 +16,22 @@ @pytest.fixture def storage() -> SchemaStorage: - return init_storage(SchemaVolumeConfiguration()) + return init_storage(SchemaStorageConfiguration()) @pytest.fixture def synced_storage() -> SchemaStorage: # will be created in /schemas - return init_storage(SchemaVolumeConfiguration(import_schema_path=TEST_STORAGE_ROOT + "/import", export_schema_path=TEST_STORAGE_ROOT + "/import")) + return init_storage(SchemaStorageConfiguration(import_schema_path=TEST_STORAGE_ROOT + "/import", export_schema_path=TEST_STORAGE_ROOT + "/import")) @pytest.fixture def ie_storage() -> SchemaStorage: # will be created in /schemas - return init_storage(SchemaVolumeConfiguration(import_schema_path=TEST_STORAGE_ROOT + "/import", export_schema_path=TEST_STORAGE_ROOT + "/export")) + return init_storage(SchemaStorageConfiguration(import_schema_path=TEST_STORAGE_ROOT + "/import", export_schema_path=TEST_STORAGE_ROOT + "/export")) -def init_storage(C: SchemaVolumeConfiguration) -> SchemaStorage: +def init_storage(C: SchemaStorageConfiguration) -> SchemaStorage: # use live schema storage for test which must be backward compatible with schema storage s = LiveSchemaStorage(C, makedirs=True) assert C is s.config @@ -50,7 +49,7 @@ def test_load_non_existing(storage: SchemaStorage) -> None: def test_load_schema_with_upgrade() -> None: # point the storage root to v4 schema google_spreadsheet_v3.schema - storage = LiveSchemaStorage(SchemaVolumeConfiguration(COMMON_TEST_CASES_PATH + "schemas/sheets")) + storage = LiveSchemaStorage(SchemaStorageConfiguration(COMMON_TEST_CASES_PATH + "schemas/sheets")) # the hash when computed on the schema does not match the version_hash in the file so it should raise InStorageSchemaModified # but because the version upgrade is required, the check is skipped and the load succeeds storage.load_schema("google_spreadsheet_v4") diff --git a/tests/conftest.py b/tests/conftest.py index 9d7e6db9c2..13b709fa2f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,24 +9,25 @@ def pytest_configure(config): # the dataclass implementation will use those patched values when creating instances (the values present # in the declaration are not frozen allowing patching) - from dlt.common.configuration.specs import normalize_volume_configuration, run_configuration, load_volume_configuration, schema_volume_configuration + from dlt.common.configuration.specs import run_configuration + from dlt.common.storages import configuration as storage_configuration test_storage_root = "_storage" run_configuration.RunConfiguration.config_files_storage_path = os.path.join(test_storage_root, "config/") run_configuration.RunConfiguration.dlthub_telemetry_segment_write_key = "TLJiyRkGVZGCi2TtjClamXpFcxAA1rSB" - load_volume_configuration.LoadVolumeConfiguration.load_volume_path = os.path.join(test_storage_root, "load") - delattr(load_volume_configuration.LoadVolumeConfiguration, "__init__") - load_volume_configuration.LoadVolumeConfiguration = dataclasses.dataclass(load_volume_configuration.LoadVolumeConfiguration, init=True, repr=False) + storage_configuration.LoadStorageConfiguration.load_volume_path = os.path.join(test_storage_root, "load") + delattr(storage_configuration.LoadStorageConfiguration, "__init__") + storage_configuration.LoadStorageConfiguration = dataclasses.dataclass(storage_configuration.LoadStorageConfiguration, init=True, repr=False) - normalize_volume_configuration.NormalizeVolumeConfiguration.normalize_volume_path = os.path.join(test_storage_root, "normalize") + storage_configuration.NormalizeStorageConfiguration.normalize_volume_path = os.path.join(test_storage_root, "normalize") # delete __init__, otherwise it will not be recreated by dataclass - delattr(normalize_volume_configuration.NormalizeVolumeConfiguration, "__init__") - normalize_volume_configuration.NormalizeVolumeConfiguration = dataclasses.dataclass(normalize_volume_configuration.NormalizeVolumeConfiguration, init=True, repr=False) + delattr(storage_configuration.NormalizeStorageConfiguration, "__init__") + storage_configuration.NormalizeStorageConfiguration = dataclasses.dataclass(storage_configuration.NormalizeStorageConfiguration, init=True, repr=False) - schema_volume_configuration.SchemaVolumeConfiguration.schema_volume_path = os.path.join(test_storage_root, "schemas") - delattr(schema_volume_configuration.SchemaVolumeConfiguration, "__init__") - schema_volume_configuration.SchemaVolumeConfiguration = dataclasses.dataclass(schema_volume_configuration.SchemaVolumeConfiguration, init=True, repr=False) + storage_configuration.SchemaStorageConfiguration.schema_volume_path = os.path.join(test_storage_root, "schemas") + delattr(storage_configuration.SchemaStorageConfiguration, "__init__") + storage_configuration.SchemaStorageConfiguration = dataclasses.dataclass(storage_configuration.SchemaStorageConfiguration, init=True, repr=False) assert run_configuration.RunConfiguration.config_files_storage_path == os.path.join(test_storage_root, "config/") diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index 8217fe5b29..6fa0458b05 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -9,11 +9,11 @@ from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.exceptions import DictValidationException from dlt.common.pipeline import StateInjectableContext, TPipelineState +from dlt.common.source import _SOURCES from dlt.common.schema import Schema from dlt.common.schema.utils import new_table from dlt.cli.source_detection import detect_source_configs -from dlt.extract.decorators import _SOURCES from dlt.extract.exceptions import InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, ParametrizedResourceUnbound, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable from dlt.extract.source import DltResource, DltSource @@ -109,6 +109,23 @@ def empty_t_1(items, _meta): list(empty_t_1) +def test_transformer_no_parens() -> None: + bound_r = dlt.resource([1, 2, 3], name="data") + + @dlt.transformer + def empty_t_1(item, meta = None): + yield "a" * item + + assert list(bound_r | empty_t_1) == ["a", "aa", "aaa"] + + def empty_t_2(item, _meta): + yield _meta * item + + # create dynamic transformer with explicit func + t = dlt.transformer(empty_t_2, data_from=bound_r) + assert list(t("m")) == ["m", "mm", "mmm"] + + def test_source_name_is_invalid_schema_name() -> None: # inferred from function name, names must be small caps etc. @@ -171,7 +188,7 @@ def some_data(): r = dlt.resource(some_data()) assert r.name == "some_data" - assert r.section is None + assert r.section == "test_decorators" def test_source_sections() -> None: diff --git a/tests/extract/test_extract.py b/tests/extract/test_extract.py index 91aea24434..69787bb9f9 100644 --- a/tests/extract/test_extract.py +++ b/tests/extract/test_extract.py @@ -1,6 +1,6 @@ import dlt from dlt.common import json -from dlt.common.configuration.specs import NormalizeVolumeConfiguration +from dlt.common.storages import NormalizeStorageConfiguration from dlt.extract.extract import ExtractorStorage, extract from dlt.extract.source import DltResource, DltSource @@ -16,7 +16,7 @@ def expect_tables(resource: DltResource) -> dlt.Schema: source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource(10)]) schema = source.discover_schema() - storage = ExtractorStorage(NormalizeVolumeConfiguration()) + storage = ExtractorStorage(NormalizeStorageConfiguration()) extract_id = storage.create_extract_id() schema_update = extract(extract_id, source, storage) # odd and even tables @@ -36,10 +36,11 @@ def expect_tables(resource: DltResource) -> dlt.Schema: # delete files clean_test_storage() - storage = ExtractorStorage(NormalizeVolumeConfiguration()) + storage = ExtractorStorage(NormalizeStorageConfiguration()) # same thing but select only odd source = DltSource("selectables", "module", dlt.Schema("selectables"), [resource]) - source.with_resources(resource.name).selected_resources[resource.name].bind(10).select_tables("odd_table") + source = source.with_resources(resource.name) + source.selected_resources[resource.name].bind(10).select_tables("odd_table") extract_id = storage.create_extract_id() schema_update = extract(extract_id, source, storage) assert len(schema_update) == 1 diff --git a/tests/extract/test_incremental.py b/tests/extract/test_incremental.py index cdfcc38226..3bb400c9c9 100644 --- a/tests/extract/test_incremental.py +++ b/tests/extract/test_incremental.py @@ -10,7 +10,7 @@ from dlt.common.configuration.container import Container from dlt.common.configuration.specs.base_configuration import configspec, BaseConfiguration from dlt.common.pendulum import pendulum, timedelta -from dlt.common.pipeline import StateInjectableContext, _resource_state +from dlt.common.pipeline import StateInjectableContext, resource_state from dlt.common.schema.schema import Schema from dlt.common.utils import uniq_id, digest128 from dlt.common.json import json @@ -471,7 +471,7 @@ def test_replace_resets_state() -> None: parent_r = standalone_some_data(now) @dlt.transformer(data_from=parent_r, write_disposition="append") def child(item): - state = _resource_state("child") + state = resource_state("child") print(f"CHILD: {state}") state["mark"] = f"mark:{item['delta']}" yield item diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index b5e8be0efa..3efc52f918 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -89,49 +89,49 @@ def bad_transformer_3(*, item): # transformer must be created on a callable with at least one argument with pytest.raises(InvalidTransformerDataTypeGeneratorFunctionRequired): - dlt.transformer(r)("a") + dlt.transformer(data_from=r)("a") with pytest.raises(InvalidTransformerDataTypeGeneratorFunctionRequired): - dlt.transformer(r)(bad_transformer()) + dlt.transformer(data_from=r)(bad_transformer()) # transformer must take at least one arg with pytest.raises(InvalidTransformerGeneratorFunction) as py_ex: - dlt.transformer(r)(bad_transformer) + dlt.transformer(data_from=r)(bad_transformer) assert py_ex.value.code == 1 # transformer may have only one positional argument and it must be first with pytest.raises(InvalidTransformerGeneratorFunction) as py_ex: - dlt.transformer(r)(bad_transformer_2) + dlt.transformer(data_from=r)(bad_transformer_2) assert py_ex.value.code == 2 # first argument cannot be kw only with pytest.raises(InvalidTransformerGeneratorFunction) as py_ex: - dlt.transformer(r)(bad_transformer_3) + dlt.transformer(data_from=r)(bad_transformer_3) assert py_ex.value.code == 3 # transformer must take data from a resource - with pytest.raises(InvalidTransformerGeneratorFunction): - dlt.transformer(bad_transformer)(good_transformer) + with pytest.raises(InvalidParentResourceIsAFunction): + dlt.transformer(data_from=bad_transformer)(good_transformer) with pytest.raises(InvalidParentResourceDataType): - dlt.transformer(bad_transformer())(good_transformer) + dlt.transformer(data_from=bad_transformer())(good_transformer) # transformer is unbound r = dlt.resource(["itemX", "itemY"], name="items") - t = dlt.transformer(r)(good_transformer) + t = dlt.transformer(data_from=r)(good_transformer) with pytest.raises(ParametrizedResourceUnbound): list(t) # pass wrong arguments r = dlt.resource(["itemX", "itemY"], name="items") - t = dlt.transformer(r)(good_transformer) + t = dlt.transformer(data_from=r)(good_transformer) with pytest.raises(TypeError): list(t("p1", 1, 2, 3, 4)) # pass arguments that fully bind the item r = dlt.resource(["itemX", "itemY"], name="items") - t = dlt.transformer(r)(good_transformer) + t = dlt.transformer(data_from=r)(good_transformer) with pytest.raises(TypeError): t(item={}, p1="p2", p2=1) r = dlt.resource(["itemX", "itemY"], name="items") - t = dlt.transformer(r)(good_transformer) + t = dlt.transformer(data_from=r)(good_transformer) items = list(t(p1="p1", p2=2)) def assert_items(_items: TDataItems) -> None: @@ -144,7 +144,7 @@ def assert_items(_items: TDataItems) -> None: # parameters passed as args r = dlt.resource(["itemX", "itemY"], name="items") - t = dlt.transformer(r)(good_transformer) + t = dlt.transformer(data_from=r)(good_transformer) items = list(t("p1", 2)) assert_items(items) @@ -296,15 +296,15 @@ def test_resource_bind_lazy_eval() -> None: def needs_param(param): yield from range(param) - @dlt.transformer(needs_param(3)) + @dlt.transformer(data_from=needs_param(3)) def tx_form(item, multi): yield item*multi - @dlt.transformer(tx_form(2)) + @dlt.transformer(data_from=tx_form(2)) def tx_form_fin(item, div): yield item / div - @dlt.transformer(needs_param) + @dlt.transformer(data_from=needs_param) def tx_form_dir(item, multi): yield item*multi @@ -372,19 +372,70 @@ def test_source(no_resources): # successful select s_sel = s.with_resources("resource_1", "resource_7") - # returns self - assert s is s_sel - assert list(s.selected_resources) == ["resource_1", "resource_7"] == list(s.resources.selected) - assert list(s.resources) == all_resource_names - info = str(s) + # returns a clone + assert s is not s_sel + assert list(s_sel.selected_resources) == ["resource_1", "resource_7"] == list(s_sel.resources.selected) + assert list(s_sel.resources) == all_resource_names + info = str(s_sel) assert "resource resource_0 is not selected" in info + # original is not affected + assert list(s.selected_resources) == all_resource_names # reselect assert list(s.with_resources("resource_8").selected_resources) == ["resource_8"] # nothing selected assert list(s.with_resources().selected_resources) == [] # nothing is selected so nothing yielded - assert list(s) == [] + assert list(s.with_resources()) == [] + + +def test_clone_source() -> None: + @dlt.source + def test_source(no_resources): + + def _gen(i): + yield "A" * i + + for i in range(no_resources): + yield dlt.resource(_gen(i), name="resource_" + str(i)) + + s = test_source(4) + all_resource_names = ["resource_" + str(i) for i in range(4)] + clone_s = s.clone() + assert len(s.resources) == len(clone_s.resources) == len(all_resource_names) + assert s.schema is not clone_s.schema + for name in all_resource_names: + # resource is a clone + assert s.resources[name] is not clone_s.resources[name] + assert s.resources[name]._pipe is not clone_s.resources[name]._pipe + # but we keep pipe ids + assert s.resources[name]._pipe._pipe_id == clone_s.resources[name]._pipe._pipe_id + + assert list(s) == ['', 'A', 'AA', 'AAA'] + # we expired generators + assert list(clone_s) == [] + + # clone parametrized generators + + @dlt.source + def test_source(no_resources): + + def _gen(i): + yield "A" * i + + for i in range(no_resources): + yield dlt.resource(_gen, name="resource_" + str(i)) + + s = test_source(4) + clone_s = s.clone() + # bind resources + for idx, name in enumerate(all_resource_names): + s.resources[name].bind(idx) + clone_s.resources[name].bind(idx) + + # now thanks to late eval both sources evaluate separately + assert list(s) == ['', 'A', 'AA', 'AAA'] + assert list(clone_s) == ['', 'A', 'AA', 'AAA'] def test_multiple_parametrized_transformers() -> None: diff --git a/tests/load/pipeline/test_drop.py b/tests/load/pipeline/test_drop.py index 146a3f41bd..8e6b8f7338 100644 --- a/tests/load/pipeline/test_drop.py +++ b/tests/load/pipeline/test_drop.py @@ -10,8 +10,7 @@ from dlt.common.utils import uniq_id from dlt.pipeline import helpers, state_sync, Pipeline from dlt.load import Load -from dlt.pipeline.exceptions import PipelineHasPendingDataException, PipelineStepFailed -from dlt.common.pipeline import _resource_state +from dlt.pipeline.exceptions import PipelineStepFailed from dlt.destinations.job_client_impl import SqlJobClientBase from tests.load.pipeline.utils import drop_pipeline diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index feeb2a1b05..6ca87b9f10 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -9,7 +9,7 @@ from dlt.common import json, pendulum from dlt.common.configuration.container import Container -from dlt.common.pipeline import StateInjectableContext, _resource_state +from dlt.common.pipeline import StateInjectableContext from dlt.common.typing import AnyFun, StrAny from dlt.common.utils import digest128 from dlt.extract.source import DltResource diff --git a/tests/load/utils.py b/tests/load/utils.py index 6867d2db1d..bf15eb35a3 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -7,14 +7,13 @@ from dlt.common import json, Decimal, sleep from dlt.common.configuration import resolve_configuration from dlt.common.configuration.container import Container -from dlt.common.configuration.specs import SchemaVolumeConfiguration from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.destination.reference import DestinationClientDwhConfiguration, DestinationReference, JobClientBase, LoadJob from dlt.common.data_writers import DataWriter from dlt.common.schema import TColumnSchema, TTableSchemaColumns -from dlt.common.storages import SchemaStorage, FileStorage -from dlt.common.schema.utils import new_table +from dlt.common.storages import SchemaStorage, FileStorage, SchemaStorageConfiguration from dlt.common.storages.load_storage import ParsedLoadJobFileName +from dlt.common.schema.utils import new_table from dlt.common.typing import StrAny from dlt.common.utils import uniq_id @@ -149,7 +148,7 @@ def yield_client( # also apply to config config.update(default_config_values) # get event default schema - C = resolve_configuration(SchemaVolumeConfiguration(), explicit_value={ + C = resolve_configuration(SchemaStorageConfiguration(), explicit_value={ "schema_volume_path": "tests/common/cases/schemas/rasa" }) schema_storage = SchemaStorage(C) diff --git a/tests/pipeline/test_pipeline_state.py b/tests/pipeline/test_pipeline_state.py index 0eab281e63..853abb56d3 100644 --- a/tests/pipeline/test_pipeline_state.py +++ b/tests/pipeline/test_pipeline_state.py @@ -4,13 +4,14 @@ import dlt -from dlt.common.exceptions import PipelineStateNotAvailable +from dlt.common.exceptions import PipelineStateNotAvailable, ResourceNameNotAvailable from dlt.common.schema import Schema +from dlt.common.source import get_current_pipe_name from dlt.common.storages import FileStorage from dlt.common import pipeline as state_module from dlt.common.utils import uniq_id -from dlt.pipeline.exceptions import PipelineStateEngineNoUpgradePathException +from dlt.pipeline.exceptions import PipelineStateEngineNoUpgradePathException, PipelineStepFailed from dlt.pipeline.pipeline import Pipeline from dlt.pipeline.state_sync import migrate_state, STATE_ENGINE_VERSION @@ -25,6 +26,13 @@ def some_data(): dlt.current.source_state()["last_value"] = last_value + 1 +@dlt.resource() +def some_data_resource_state(): + last_value = dlt.current.resource_state().get("last_value", 0) + yield [1,2,3] + dlt.current.resource_state()["last_value"] = last_value + 1 + + def test_managed_state() -> None: p = dlt.pipeline(pipeline_name="managed_state") p.extract(some_data) @@ -149,9 +157,9 @@ def test_unmanaged_state() -> None: def _gen_inner(): dlt.state()["gen"] = True yield 1 - + list(dlt.resource(_gen_inner)) list(dlt.resource(_gen_inner())) - assert state_module._last_full_state["sources"]["unmanaged"]["gen"] is True + assert state_module._last_full_state["sources"]["test_pipeline_state"]["gen"] is True @dlt.source def some_source(): @@ -185,7 +193,7 @@ def test_unmanaged_state_no_pipeline() -> None: assert state_module._last_full_state["sources"]["test_pipeline_state"]["last_value"] == 1 def _gen_inner(): - dlt.state()["gen"] = True + dlt.current.state()["gen"] = True yield 1 list(dlt.resource(_gen_inner())) @@ -193,6 +201,180 @@ def _gen_inner(): assert state_module._last_full_state["sources"][fk]["gen"] is True +def test_resource_state_write() -> None: + r = some_data_resource_state() + assert list(r) == [1, 2, 3] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["some_data_resource_state"]["last_value"] == 1 + with pytest.raises(ResourceNameNotAvailable): + get_current_pipe_name() + + def _gen_inner(): + dlt.current.resource_state()["gen"] = True + yield 1 + + dlt.pipeline() + r = dlt.resource(_gen_inner(), name="name_ovrd") + assert list(r) == [1] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["name_ovrd"]["gen"] is True + with pytest.raises(ResourceNameNotAvailable): + get_current_pipe_name() + + +def test_resource_state_in_pipeline() -> None: + p = dlt.pipeline() + r = some_data_resource_state() + p.extract(r) + assert r.state["last_value"] == 1 + with pytest.raises(ResourceNameNotAvailable): + get_current_pipe_name() + + def _gen_inner(tv="df"): + dlt.current.resource_state()["gen"] = tv + yield 1 + + r = dlt.resource(_gen_inner("gen_tf"), name="name_ovrd") + p.extract(r) + assert r.state["gen"] == "gen_tf" + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["name_ovrd"]["gen"] == "gen_tf" + with pytest.raises(ResourceNameNotAvailable): + get_current_pipe_name() + + r = dlt.resource(_gen_inner, name="pure_function") + p.extract(r) + assert r.state["gen"] == "df" + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["pure_function"]["gen"] == "df" + with pytest.raises(ResourceNameNotAvailable): + get_current_pipe_name() + + # get resource state in defer function + def _gen_inner_defer(tv="df"): + + @dlt.defer + def _run(): + dlt.current.resource_state()["gen"] = tv + return 1 + + yield _run() + + r = dlt.resource(_gen_inner_defer, name="defer_function") + # you cannot get resource name in `defer` function + with pytest.raises(PipelineStepFailed) as pip_ex: + p.extract(r) + assert isinstance(pip_ex.value.__context__, ResourceNameNotAvailable) + + # get resource state in defer explicitly + def _gen_inner_defer_explicit_name(resource_name, tv="df"): + + @dlt.defer + def _run(): + dlt.current.resource_state(resource_name)["gen"] = tv + return 1 + + yield _run() + + r = dlt.resource(_gen_inner_defer_explicit_name, name="defer_function_explicit") + p.extract(r("defer_function_explicit", "expl")) + assert r.state["gen"] == "expl" + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["defer_function_explicit"]["gen"] == "expl" + + # get resource state in yielding defer (which btw is invalid and will be resolved in main thread) + def _gen_inner_defer_yielding(tv="yielding"): + + @dlt.defer + def _run(): + dlt.current.resource_state()["gen"] = tv + yield from [1, 2, 3] + + yield _run() + + r = dlt.resource(_gen_inner_defer_yielding, name="defer_function_yielding") + p.extract(r) + assert r.state["gen"] == "yielding" + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["defer_function_yielding"]["gen"] == "yielding" + + # get resource state in async function + def _gen_inner_async(tv="async"): + + async def _run(): + dlt.current.resource_state()["gen"] = tv + return 1 + + yield _run() + + r = dlt.resource(_gen_inner_async, name="async_function") + # you cannot get resource name in `defer` function + with pytest.raises(PipelineStepFailed) as pip_ex: + p.extract(r) + assert isinstance(pip_ex.value.__context__, ResourceNameNotAvailable) + + +def test_transformer_state_write() -> None: + r = some_data_resource_state() + + # yielding transformer + def _gen_inner(item): + dlt.current.resource_state()["gen"] = True + yield map(lambda i: i * 2, item) + + # p = dlt.pipeline() + # p.extract(dlt.transformer(_gen_inner, data_from=r, name="tx_other_name")) + assert list(dlt.transformer(_gen_inner, data_from=r, name="tx_other_name")) == [2, 4, 6] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["some_data_resource_state"]["last_value"] == 1 + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["tx_other_name"]["gen"] is True + + # returning transformer + def _gen_inner_rv(item): + dlt.current.resource_state()["gen"] = True + return item * 2 + + r = some_data_resource_state() + assert list(dlt.transformer(_gen_inner_rv, data_from=r, name="tx_other_name_rv")) == [1, 2, 3, 1, 2, 3] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["tx_other_name_rv"]["gen"] is True + + # deferred transformer + @dlt.defer + def _gen_inner_rv_defer(item): + dlt.current.resource_state()["gen"] = True + return item + + r = some_data_resource_state() + # not available because executed in a pool + with pytest.raises(ResourceNameNotAvailable): + print(list(dlt.transformer(_gen_inner_rv_defer, data_from=r, name="tx_other_name_defer"))) + + # async transformer + async def _gen_inner_rv_async(item): + dlt.current.resource_state()["gen"] = True + return item + + r = some_data_resource_state() + # not available because executed in a pool + with pytest.raises(ResourceNameNotAvailable): + print(list(dlt.transformer(_gen_inner_rv_async, data_from=r, name="tx_other_name_async"))) + + # async transformer with explicit resource name + async def _gen_inner_rv_async_name(item, r_name): + dlt.current.resource_state(r_name)["gen"] = True + return item + + r = some_data_resource_state() + assert list(dlt.transformer(_gen_inner_rv_async_name, data_from=r, name="tx_other_name_async")("tx_other_name_async")) == [1, 2, 3] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["tx_other_name_async"]["gen"] is True + + +def test_transform_function_state_write() -> None: + r = some_data_resource_state() + + # transform executed within the same thread + def transform(item): + dlt.current.resource_state()["form"] = item + return item*2 + + r.add_map(transform) + assert list(r) == [2, 4, 6] + assert state_module._last_full_state["sources"]["test_pipeline_state"]["resources"]["some_data_resource_state"]["form"] == 3 + + def test_migrate_state(test_storage: FileStorage) -> None: state_v1 = load_json_case("state/state.v1") state = migrate_state("test_pipeline", state_v1, state_v1["_state_engine_version"], STATE_ENGINE_VERSION) diff --git a/tests/tools/create_storages.py b/tests/tools/create_storages.py index efbc8cf8ff..4f0abe3512 100644 --- a/tests/tools/create_storages.py +++ b/tests/tools/create_storages.py @@ -1,5 +1,4 @@ -from dlt.common.storages import NormalizeStorage, LoadStorage, SchemaStorage -from dlt.common.configuration.specs import NormalizeVolumeConfiguration, LoadVolumeConfiguration, SchemaVolumeConfiguration +from dlt.common.storages import NormalizeStorage, LoadStorage, SchemaStorage, NormalizeStorageConfiguration, LoadStorageConfiguration, SchemaStorageConfiguration # NormalizeStorage(True, NormalizeVolumeConfiguration)