Skip to content

Commit

Permalink
Merge pull request #1682 from dlt-hub/devel
Browse files Browse the repository at this point in the history
master merge for 0.5.3 release
  • Loading branch information
rudolfix authored Aug 13, 2024
2 parents e00baa0 + 5795b14 commit 19c41ea
Show file tree
Hide file tree
Showing 93 changed files with 3,941 additions and 2,461 deletions.
80 changes: 80 additions & 0 deletions .github/workflows/test_destination_motherduck.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

name: dest | motherduck

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}

ACTIVE_DESTINATIONS: "[\"motherduck\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: dest | motherduck tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-motherduck

- name: Install dependencies
run: poetry install --no-interaction -E motherduck -E s3 -E gs -E az -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
2 changes: 1 addition & 1 deletion .github/workflows/test_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ env:
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
# Test redshift and filesystem with all buckets
# postgres runs again here so we can test on mac/windows
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\", \"motherduck\"]"
ACTIVE_DESTINATIONS: "[\"redshift\", \"postgres\", \"duckdb\", \"filesystem\", \"dummy\"]"

jobs:
get_docs_changes:
Expand Down
185 changes: 127 additions & 58 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dlt.common import logger
from dlt.common.configuration.specs.base_configuration import extract_inner_hint
from dlt.common.destination.utils import verify_schema_capabilities
from dlt.common.exceptions import TerminalValueError
from dlt.common.normalizers.naming import NamingConvention
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.utils import (
Expand All @@ -42,6 +43,8 @@
InvalidDestinationReference,
UnknownDestinationModule,
DestinationSchemaTampered,
DestinationTransientException,
DestinationTerminalException,
)
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
Expand Down Expand Up @@ -187,6 +190,8 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
"""How to handle replace disposition for this destination, can be classic or staging"""
staging_dataset_name_layout: str = "%s_staging"
"""Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional"""
enable_dataset_name_normalization: bool = True
"""Whether to normalize the dataset name. Affects staging dataset as well."""

def _bind_dataset_name(
self: TDestinationDwhClient, dataset_name: str, default_schema_name: str = None
Expand All @@ -205,11 +210,14 @@ def normalize_dataset_name(self, schema: Schema) -> str:
If default schema name is None or equals schema.name, the schema suffix is skipped.
"""
dataset_name = self._make_dataset_name(schema.name)
return (
dataset_name
if not dataset_name
else schema.naming.normalize_table_identifier(dataset_name)
)
if not dataset_name:
return dataset_name
else:
return (
schema.naming.normalize_table_identifier(dataset_name)
if self.enable_dataset_name_normalization
else dataset_name
)

def normalize_staging_dataset_name(self, schema: Schema) -> str:
"""Builds staging dataset name out of dataset_name and staging_dataset_name_layout."""
Expand All @@ -224,7 +232,11 @@ def normalize_staging_dataset_name(self, schema: Schema) -> str:
# no placeholder, then layout is a full name. so you can have a single staging dataset
dataset_name = self.staging_dataset_name_layout

return schema.naming.normalize_table_identifier(dataset_name)
return (
schema.naming.normalize_table_identifier(dataset_name)
if self.enable_dataset_name_normalization
else dataset_name
)

def _make_dataset_name(self, schema_name: str) -> str:
if not schema_name:
Expand Down Expand Up @@ -258,11 +270,45 @@ class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfigura
"""configuration of the staging, if present, injected at runtime"""


TLoadJobState = Literal["running", "failed", "retry", "completed"]
TLoadJobState = Literal["ready", "running", "failed", "retry", "completed"]


class LoadJob(ABC):
"""
A stateful load job, represents one job file
"""

def __init__(self, file_path: str) -> None:
self._file_path = file_path
self._file_name = FileStorage.get_file_name_from_file_path(file_path)
# NOTE: we only accept a full filepath in the constructor
assert self._file_name != self._file_path
self._parsed_file_name = ParsedLoadJobFileName.parse(self._file_name)

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass

@abstractmethod
def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass


class LoadJob:
"""Represents a job that loads a single file
class RunnableLoadJob(LoadJob, ABC):
"""Represents a runnable job that loads a single file
Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
Expand All @@ -273,75 +319,95 @@ class LoadJob:
immediately transition job into "failed" or "retry" state respectively.
"""

def __init__(self, file_name: str) -> None:
def __init__(self, file_path: str) -> None:
"""
File name is also a job id (or job id is deterministically derived) so it must be globally unique
"""
# ensure file name
assert file_name == FileStorage.get_file_name_from_file_path(file_name)
self._file_name = file_name
self._parsed_file_name = ParsedLoadJobFileName.parse(file_name)
super().__init__(file_path)
self._state: TLoadJobState = "ready"
self._exception: Exception = None

@abstractmethod
def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
pass
# variables needed by most jobs, set by the loader in set_run_vars
self._schema: Schema = None
self._load_table: TTableSchema = None
self._load_id: str = None
self._job_client: "JobClientBase" = None

def file_name(self) -> str:
"""A name of the job file"""
return self._file_name
def set_run_vars(self, load_id: str, schema: Schema, load_table: TTableSchema) -> None:
"""
called by the loader right before the job is run
"""
self._load_id = load_id
self._schema = schema
self._load_table = load_table

def job_id(self) -> str:
"""The job id that is derived from the file name and does not changes during job lifecycle"""
return self._parsed_file_name.job_id()
@property
def load_table_name(self) -> str:
return self._load_table["name"]

def job_file_info(self) -> ParsedLoadJobFileName:
return self._parsed_file_name
def run_managed(
self,
job_client: "JobClientBase",
) -> None:
"""
wrapper around the user implemented run method
"""
# only jobs that are not running or have not reached a final state
# may be started
assert self._state in ("ready", "retry")
self._job_client = job_client

# filepath is now moved to running
try:
self._state = "running"
self._job_client.prepare_load_job_execution(self)
self.run()
self._state = "completed"
except (DestinationTerminalException, TerminalValueError) as e:
self._state = "failed"
self._exception = e
except (DestinationTransientException, Exception) as e:
self._state = "retry"
self._exception = e
finally:
# sanity check
assert self._state in ("completed", "retry", "failed")

@abstractmethod
def run(self) -> None:
"""
run the actual job, this will be executed on a thread and should be implemented by the user
exception will be handled outside of this function
"""
raise NotImplementedError()

def state(self) -> TLoadJobState:
"""Returns current state. Should poll external resource if necessary."""
return self._state

def exception(self) -> str:
"""The exception associated with failed or retry states"""
pass
return str(self._exception)


class NewLoadJob(LoadJob):
"""Adds a trait that allows to save new job file"""
class FollowupJob:
"""Base class for follow up jobs that should be created"""

@abstractmethod
def new_file_path(self) -> str:
"""Path to a newly created temporary job file. If empty, no followup job should be created"""
pass


class FollowupJob:
"""Adds a trait that allows to create a followup job"""
class HasFollowupJobs:
"""Adds a trait that allows to create single or table chain followup jobs"""

def create_followup_jobs(self, final_state: TLoadJobState) -> List[NewLoadJob]:
def create_followup_jobs(self, final_state: TLoadJobState) -> List[FollowupJob]:
"""Return list of new jobs. `final_state` is state to which this job transits"""
return []


class DoNothingJob(LoadJob):
"""The most lazy class of dlt"""

def __init__(self, file_path: str) -> None:
super().__init__(FileStorage.get_file_name_from_file_path(file_path))

def state(self) -> TLoadJobState:
# this job is always done
return "completed"

def exception(self) -> str:
# this part of code should be never reached
raise NotImplementedError()


class DoNothingFollowupJob(DoNothingJob, FollowupJob):
"""The second most lazy class of dlt"""

pass


class JobClientBase(ABC):
def __init__(
self,
Expand Down Expand Up @@ -394,13 +460,16 @@ def update_stored_schema(
return expected_update

@abstractmethod
def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
"""Creates and starts a load job for a particular `table` with content in `file_path`"""
def create_load_job(
self, table: TTableSchema, file_path: str, load_id: str, restore: bool = False
) -> LoadJob:
"""Creates a load job for a particular `table` with content in `file_path`"""
pass

@abstractmethod
def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
def prepare_load_job_execution( # noqa: B027, optional override
self, job: RunnableLoadJob
) -> None:
"""Prepare the connected job client for the execution of a load job (used for query tags in sql clients)"""
pass

def should_truncate_table_before_load(self, table: TTableSchema) -> bool:
Expand All @@ -410,7 +479,7 @@ def create_table_chain_completed_followup_jobs(
self,
table_chain: Sequence[TTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
) -> List[FollowupJob]:
"""Creates a list of followup jobs that should be executed after a table chain is completed"""
return []

Expand Down
Loading

0 comments on commit 19c41ea

Please sign in to comment.