Skip to content

Commit

Permalink
Merge pull request #312 from dlt-hub/rfix/progress-bars-docs
Browse files Browse the repository at this point in the history
progress bars docs
  • Loading branch information
rudolfix authored Apr 30, 2023
2 parents 80fdb0c + c9487b6 commit 89c57dd
Show file tree
Hide file tree
Showing 19 changed files with 298 additions and 51 deletions.
11 changes: 8 additions & 3 deletions dlt/cli/_dlt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dlt.cli import utils
from dlt.cli.init_command import init_command, list_pipelines_command, DLT_INIT_DOCS_URL, DEFAULT_PIPELINES_REPO
from dlt.cli.deploy_command import PipelineWasNotRun, deploy_command, DLT_DEPLOY_DOCS_URL
from dlt.cli.pipeline_command import pipeline_command
from dlt.cli.pipeline_command import pipeline_command, DLT_PIPELINE_COMMAND_DOCS_URL
from dlt.pipeline.exceptions import CannotRestorePipelineException


Expand Down Expand Up @@ -225,8 +225,13 @@ def main() -> int:
pipe_cmd_schema.add_argument("--format", choices=["json", "yaml"], default="yaml", help="Display schema in this format")
pipe_cmd_schema.add_argument("--remove-defaults", action="store_true", help="Does not show default hint values")

pipe_cmd_drop = pipeline_subparsers.add_parser("drop", help="Drop pipeline state and resource tables", parents=[pipe_cmd_sync_parent])
pipe_cmd_drop.add_argument("resources", nargs="*", help="One or more resources to drop. Can be exact resource name(s) or regex pattern(s).")
pipe_cmd_drop = pipeline_subparsers.add_parser(
"drop",
help="Selectively drop tables and reset state",
parents=[pipe_cmd_sync_parent],
epilog=f"See {DLT_PIPELINE_COMMAND_DOCS_URL}#selectively-drop-tables-and-reset-state for more info"
)
pipe_cmd_drop.add_argument("resources", nargs="*", help="One or more resources to drop. Can be exact resource name(s) or regex pattern(s). Regex patterns must start with re:")
pipe_cmd_drop.add_argument("--drop-all", action="store_true", default=False, help="Drop all resources found in schema. Supersedes [resources] argument.")
pipe_cmd_drop.add_argument("--state-paths", nargs="*", help="State keys or json paths to drop", default=())
pipe_cmd_drop.add_argument("--schema", help="Schema name to drop from (if other than default schema).", dest="schema_name")
Expand Down
4 changes: 4 additions & 0 deletions dlt/cli/echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def bold(msg: str) -> str:
return click.style(msg, bold=True, reset=True)


def error(msg: str) -> None:
click.secho("ERROR: " + msg, fg="red")


def warning(msg: str) -> None:
click.secho("WARNING: " + msg, fg="yellow")

Expand Down
59 changes: 50 additions & 9 deletions dlt/cli/pipeline_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
from dlt.cli.exceptions import CliCommandException

from dlt.common import json
from dlt.common.pipeline import get_dlt_pipelines_dir, TSourceState
from dlt.common.pipeline import _resource_state, get_dlt_pipelines_dir, TSourceState
from dlt.common.destination.reference import TDestinationReferenceArg
from dlt.common.runners import Venv
from dlt.common.runners.stdout import iter_stdout
from dlt.common.schema.utils import remove_defaults
from dlt.common.schema.utils import group_tables_by_resource, remove_defaults
from dlt.common.storages.file_storage import FileStorage
from dlt.common.typing import DictStrAny
from dlt.pipeline.helpers import DropCommand
from dlt.pipeline.exceptions import CannotRestorePipelineException

from dlt.cli import echo as fmt

DLT_PIPELINE_COMMAND_DOCS_URL = "https://dlthub.com/docs/reference/command-line-interface"


def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, verbosity: int, dataset_name: str = None, destination: TDestinationReferenceArg = None, **command_kwargs: Any) -> None:
if operation == "list":
Expand All @@ -35,13 +37,18 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
except CannotRestorePipelineException as e:
if operation not in {"sync", "drop"}:
raise
fmt.echo(e)
fmt.warning(str(e))
if not fmt.confirm("Do you want to attempt to restore the pipeline state from destination?", default=False):
return
destination = destination or fmt.text_input(f"Enter destination name for pipeline {fmt.bold(pipeline_name)}")
dataset_name = dataset_name or fmt.text_input(f"Enter dataset name for pipeline {fmt.bold(pipeline_name)}")
p = dlt.pipeline(pipeline_name, pipelines_dir, destination=destination, dataset_name=dataset_name)
p.sync_destination()
if p.first_run:
# remote state was not found
p._wipe_working_folder()
fmt.error(f"Pipeline {pipeline_name} was not found in dataset {dataset_name} in {destination}")
return
if operation == "sync":
return # No need to sync again

Expand All @@ -62,11 +69,12 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
for k, v in state.items():
if not isinstance(v, dict):
fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
if state.get("sources"):
sources_state = state.get("sources")
if sources_state:
fmt.echo()
fmt.secho("sources:", fg="green")
if verbosity > 0:
fmt.echo(json.dumps(state["sources"], pretty=True))
fmt.echo(json.dumps(sources_state, pretty=True))
else:
fmt.echo("Add -v option to see sources state. Note that it could be large.")

Expand All @@ -76,6 +84,23 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if not isinstance(v, dict):
fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
fmt.echo()
if p.default_schema_name is None:
fmt.warning("This pipeline does not have a default schema")
else:
is_single_schema = len(p.schema_names)
for schema_name in p.schema_names:
fmt.echo("Resources in schema: %s" % fmt.bold(schema_name))
schema = p.schemas[schema_name]
data_tables = {t["name"]: t for t in schema.data_tables()}
for resource_name, tables in group_tables_by_resource(data_tables).items():
res_state_slots = 0
if sources_state:
source_state = next(iter(sources_state.items()))[1] if is_single_schema else sources_state.get(schema_name)
if source_state:
resource_state = _resource_state(resource_name, source_state)
res_state_slots = len(resource_state)
fmt.echo("%s with %s table(s) and %s resource state slot(s)" % (fmt.bold(resource_name), fmt.bold(str(len(tables))), fmt.bold(str(res_state_slots))))
fmt.echo()
fmt.echo("Working dir content:")
extracted_files = p.list_extracted_resources()
if extracted_files:
Expand Down Expand Up @@ -146,7 +171,7 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver
if verbosity == 0:
print("Add -v option to see schema update. Note that it could be large.")
else:
tables = remove_defaults({"tables": package_info.schema_update})
tables = remove_defaults({"tables": package_info.schema_update}) # type: ignore
fmt.echo(fmt.bold("Schema update:"))
fmt.echo(yaml.dump(tables, allow_unicode=True, default_flow_style=False, sort_keys=False))

Expand All @@ -160,8 +185,24 @@ def pipeline_command(operation: str, pipeline_name: str, pipelines_dir: str, ver

if operation == "drop":
drop = DropCommand(p, **command_kwargs)
fmt.echo("About to drop the following data for the pipeline:")
for k, v in drop.info.items():
fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
if drop.is_empty:
fmt.echo("Could not select any resources to drop and no resource/source state to reset. Use the command below to inspect the pipeline:")
fmt.echo(f"dlt pipeline -v {p.pipeline_name} info")
if len(drop.info["warnings"]):
fmt.echo("Additional warnings are available")
for warning in drop.info["warnings"]:
fmt.warning(warning)
return

fmt.echo("About to drop the following data in dataset %s in destination %s:" % (fmt.bold(drop.info["dataset_name"]), fmt.bold(p.destination.__name__)))
fmt.echo("%s: %s" % (fmt.style("Selected schema", fg="green"), drop.info["schema_name"]))
fmt.echo("%s: %s" % (fmt.style("Selected resource(s)", fg="green"), drop.info["resource_names"]))
fmt.echo("%s: %s" % (fmt.style("Table(s) to drop", fg="green"), drop.info["tables"]))
fmt.echo("%s: %s" % (fmt.style("Resource(s) state to reset", fg="green"), drop.info["resource_states"]))
fmt.echo("%s: %s" % (fmt.style("Source state path(s) to reset", fg="green"), drop.info["state_paths"]))
# for k, v in drop.info.items():
# fmt.echo("%s: %s" % (fmt.style(k, fg="green"), v))
for warning in drop.info["warnings"]:
fmt.warning(warning)
if fmt.confirm("Do you want to apply these changes?", default=False):
drop()
5 changes: 3 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,12 @@ def _delete_source_state_keys(key: TAnyJsonPath, source_state_: Optional[DictStr
delete_matches(key, state_)


def _resource_state(resource_name: str) -> DictStrAny:
def _resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> DictStrAny:
"""Alpha version of the resource state, the signature will change.
Returns resource-scoped state.
"""
return source_state().setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore
state_ = source_state() if source_state_ is None else source_state_
return state_.setdefault('resources', {}).setdefault(resource_name, {}) # type: ignore


def _reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny] = None, /) -> None:
Expand Down
1 change: 0 additions & 1 deletion dlt/common/runtime/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ class EnlightenCollector(Collector):

def __init__(self, single_bar: bool = False, **enlighten_kwargs: Any) -> None:
"""Collector that uses Enlighten to display counters as progress bars. Set `single_bar` to True to show just the main progress bar. Pass any config to Enlighten in kwargs"""
print("enlighten")
try:
global enlighten

Expand Down
5 changes: 3 additions & 2 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,12 @@ def group_tables_by_resource(tables: TSchemaTables, pattern: Optional[REPattern]
"""Create a dict of resources and their associated tables and descendant tables
If `pattern` is supplied, the result is filtered to only resource names matching the pattern.
"""
result = {}
result: Dict[str, List[TTableSchema]] = {}
for table in tables.values():
resource = table.get('resource')
if resource and (pattern is None or pattern.match(resource)):
result[resource] = get_child_tables(tables, table['name'])
resource_tables = result.setdefault(resource, [])
resource_tables.extend(get_child_tables(tables, table['name']))
return result


Expand Down
20 changes: 18 additions & 2 deletions dlt/extract/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def add_limit(self, max_items: int) -> "DltResource": # noqa: A003
"DltResource": returns self
"""
def _gen_wrap(gen: TPipeStep) -> TPipeStep:
"""Wrap a generator to take the first 50 records"""
"""Wrap a generator to take the first `max_items` records"""
nonlocal max_items
count = 0
if inspect.isfunction(gen):
Expand All @@ -233,7 +233,7 @@ def _gen_wrap(gen: TPipeStep) -> TPipeStep:
for i in gen: # type: ignore # TODO: help me fix this later
yield i
count += 1
if count > max_items:
if count == max_items:
return
finally:
if inspect.isgenerator(gen):
Expand Down Expand Up @@ -606,6 +606,22 @@ def with_resources(self, *resource_names: str) -> "DltSource":
self._resources.select(*resource_names)
return self


def add_limit(self, max_items: int) -> "DltSource": # noqa: A003
"""Adds a limit `max_items` yielded from all selected resources in the source that are not transformers.
This is useful for testing, debugging and generating sample datasets for experimentation. You can easily get your test dataset in a few minutes, when otherwise
you'd need to wait hours for the full loading to complete.
Args:
max_items (int): The maximum number of items to yield
Returns:
"DltSource": returns self
"""
for resource in self.resources.selected.values():
resource.add_limit(max_items)
return self

@property
def run(self) -> SupportsPipelineRun:
"""A convenience method that will call `run` run on the currently active `dlt` pipeline. If pipeline instance is not found, one with default settings will be created."""
Expand Down
11 changes: 9 additions & 2 deletions dlt/helpers/streamlit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Dict
from typing import Dict, List
import humanize


Expand Down Expand Up @@ -226,8 +226,15 @@ def _query_data(query: str, chunk_size: int = None) -> pd.DataFrame:
st.header(table_name)
if "description" in table:
st.text(table["description"])
table_hints: List[str] = []
if "parent" in table:
st.text("Parent table: " + table["parent"])
table_hints.append("parent: **%s**" % table["parent"])
if "resource" in table:
table_hints.append("resource: **%s**" % table["resource"])
if "write_disposition" in table:
table_hints.append("write disposition: **%s**" % table["write_disposition"])

st.markdown(" | ".join(table_hints))

# table schema contains various hints (like clustering or partition options) that we do not want to show in basic view
essentials_f = lambda c: {k:v for k, v in c.items() if k in ["name", "data_type", "nullable"]}
Expand Down
5 changes: 2 additions & 3 deletions dlt/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def run(
columns: Sequence[TColumnSchema] = None,
schema: Schema = None
) -> LoadInfo:
"""Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
"""Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
### Summary
This method will `extract` the data from the `data` argument, infer the schema, `normalize` the data into a load package (ie. jsonl or PARQUET files representing tables) and then `load` such packages into the `destination`.
Expand All @@ -193,7 +193,6 @@ def run(
dataset_name (str, optional):A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`
credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values.
Expand All @@ -203,7 +202,7 @@ def run(
* `@dlt.resource`: resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
* `@dlt.source`: source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table.
write_disposition (Literal["skip", "append", "replace"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. . Defaults to "append".
write_disposition (Literal["skip", "append", "replace", "merge"], optional): Controls how to write data to a table. `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten.
columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.
Expand Down
14 changes: 13 additions & 1 deletion dlt/pipeline/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class _DropInfo(TypedDict):
dataset_name: str
drop_all: bool
resource_pattern: Optional[REPattern]
warnings: List[str]


class DropCommand:
Expand Down Expand Up @@ -104,10 +105,19 @@ def __init__(
resource_names=resource_names,
schema_name=self.schema.name, dataset_name=self.pipeline.dataset_name,
drop_all=drop_all,
resource_pattern=self.resource_pattern
resource_pattern=self.resource_pattern,
warnings=[]
)
if self.resource_pattern and not resource_tables:
self.info['warnings'].append(
f"Specified resource(s) {str(resources)} did not select any table(s) in schema {self.schema.name}. Possible resources are: {list(group_tables_by_resource(data_tables).keys())}"
)
self._new_state = self._create_modified_state()

@property
def is_empty(self) -> bool:
return len(self.info['tables']) == 0 and len(self.info["state_paths"]) == 0 and len(self.info["resource_states"]) == 0

def _drop_destination_tables(self) -> None:
with self.pipeline._sql_job_client(self.schema) as client:
client.drop_tables(*[tbl['name'] for tbl in self.tables_to_drop])
Expand All @@ -131,6 +141,8 @@ def _create_modified_state(self) -> Dict[str, Any]:
self.info['resource_states'].append(key)
_reset_resource_state(key, source_state)
resolved_paths = resolve_paths(self.state_paths_to_drop, source_state)
if self.state_paths_to_drop and not resolved_paths:
self.info['warnings'].append(f"State paths {self.state_paths_to_drop} did not select any paths in source {source_name}")
_delete_source_state_keys(resolved_paths, source_state)
self.info['state_paths'].extend(f"{source_name}.{p}" for p in resolved_paths)
return state # type: ignore[return-value]
Expand Down
Loading

0 comments on commit 89c57dd

Please sign in to comment.