Skip to content

Commit

Permalink
Merge pull request #534 from dlt-hub/rfix/incremental-external-schedule
Browse files Browse the repository at this point in the history
incremental external schedule
  • Loading branch information
rudolfix authored Aug 6, 2023
2 parents a385206 + 60a4fac commit 5e903b2
Show file tree
Hide file tree
Showing 46 changed files with 1,109 additions and 289 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ test:
(set -a && . tests/.env && poetry run pytest tests)

test-local:
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(not redshift and not bigquery)'
DESTINATION__POSTGRES__CREDENTIALS=postgresql://loader:loader@localhost:5432/dlt_data DESTINATION__DUCKDB__CREDENTIALS=duckdb:///_storage/test_quack.duckdb poetry run pytest tests -k '(postgres and duckdb)'

test-common:
poetry run pytest tests/common tests/normalize tests/extract tests/pipeline tests/reflection tests/sources tests/cli/common
Expand Down
17 changes: 11 additions & 6 deletions dlt/cli/deploy_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,21 @@ def _make_modification(self) -> None:

# save cloudbuild.yaml only if not exist to allow to run the deploy command for many different pipelines
dest_cloud_build = os.path.join(utils.AIRFLOW_BUILD_FOLDER, AIRFLOW_CLOUDBUILD_YAML)
if not os.path.exists(dest_cloud_build):
if not self.repo_storage.has_file(dest_cloud_build):
self.repo_storage.save(
dest_cloud_build,
self.artifacts["cloudbuild_file"]
)
)
else:
fmt.warning(f"{AIRFLOW_CLOUDBUILD_YAML} already created. Delete the file and run the deploy command again to re-create.")

dest_dag_script = os.path.join(utils.AIRFLOW_DAGS_FOLDER, self.artifacts["dag_script_name"])
self.repo_storage.save(
os.path.join(utils.AIRFLOW_DAGS_FOLDER, self.artifacts["dag_script_name"]),
dest_dag_script,
self.artifacts["dag_file"]
)


def _echo_instructions(self, *args: Optional[Any]) -> None:
fmt.echo("Your %s deployment for pipeline %s is ready!" % (
fmt.bold(self.deployment_method), fmt.bold(self.state["pipeline_name"]),
Expand All @@ -219,10 +224,10 @@ def _echo_instructions(self, *args: Optional[Any]) -> None:
))
fmt.echo()

fmt.echo("You must prepare your repository first:")
fmt.echo("You must prepare your DAG first:")

fmt.echo("1. Import your sources in %s, change default_args if necessary." % (fmt.bold(self.artifacts["dag_script_name"])))
fmt.echo("2. Run airflow pipeline locally.\nSee Airflow getting started: %s" % (fmt.bold(AIRFLOW_GETTING_STARTED)))
fmt.echo("1. Import your sources in %s, configure the DAG ans tasks as needed." % (fmt.bold(self.artifacts["dag_script_name"])))
fmt.echo("2. Test the DAG with Airflow locally .\nSee Airflow getting started: %s" % (fmt.bold(AIRFLOW_GETTING_STARTED)))
fmt.echo()

fmt.echo("If you are planning run the pipeline with Google Cloud Composer, follow the next instructions:\n")
Expand Down
30 changes: 23 additions & 7 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

import dlt

from dlt.common import git
from dlt.common.configuration.exceptions import LookupTrace
from dlt.common.configuration.providers import ConfigTomlProvider, EnvironProvider
from dlt.common.git import get_origin, get_repo, Repo
from dlt.common.configuration.specs.run_configuration import get_default_pipeline_name
from dlt.common.typing import StrAny
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.common.pipeline import LoadInfo, TPipelineState
from dlt.common.pipeline import LoadInfo, TPipelineState, get_dlt_repos_dir
from dlt.common.storages import FileStorage
from dlt.common.utils import set_working_dir

Expand Down Expand Up @@ -84,7 +85,7 @@ def _prepare_deployment(self) -> None:
# validate schedule
self.schedule_description = self._get_schedule_description()
fmt.echo("Looking up the deployment template scripts in %s...\n" % fmt.bold(self.repo_location))
self.template_storage = utils.clone_command_repo(self.repo_location, self.branch)
self.template_storage = git.get_fresh_repo_files(self.repo_location, get_dlt_repos_dir(), branch=self.branch)
self.working_directory = os.path.split(self.pipeline_script_path)[0]

def _get_schedule_description(self) -> Optional[Any]:
Expand All @@ -106,7 +107,19 @@ def run_deployment(self) -> None:
self._prepare_deployment()
# go through all once launched pipelines
visitors = get_visitors(self.pipeline_script, self.pipeline_script_path)
pipeline_name, pipelines_dir = parse_pipeline_info(visitors)
possible_pipelines = parse_pipeline_info(visitors)
pipeline_name: str = None
pipelines_dir: str = None

uniq_possible_pipelines = {t[0]:t for t in possible_pipelines}
if len(uniq_possible_pipelines) == 1:
pipeline_name, pipelines_dir = possible_pipelines[0]
elif len(uniq_possible_pipelines) > 1:
choices = list(uniq_possible_pipelines.keys())
choices_str = "".join([str(i+1) for i in range(len(choices))])
choices_selection = [f"{idx+1}-{name}" for idx, name in enumerate(choices)]
sel = fmt.prompt("Several pipelines found in script, please select one: " + ", ".join(choices_selection), choices=choices_str)
pipeline_name, pipelines_dir = uniq_possible_pipelines[choices[int(sel) - 1]]

if pipelines_dir:
self.pipelines_dir = os.path.abspath(pipelines_dir)
Expand All @@ -121,6 +134,7 @@ def run_deployment(self) -> None:
if not self.pipeline_name:
self.pipeline_name = get_default_pipeline_name(self.pipeline_script_path)
fmt.warning(f"Using default pipeline name {self.pipeline_name}. The pipeline name is not passed as argument to dlt.pipeline nor configured via config provides ie. config.toml")
# fmt.echo("Generating deployment for pipeline %s" % fmt.bold(self.pipeline_name))

# attach to pipeline name, get state and trace
pipeline = dlt.attach(pipeline_name=self.pipeline_name, pipelines_dir=self.pipelines_dir)
Expand Down Expand Up @@ -199,18 +213,19 @@ def get_visitors(pipeline_script: str, pipeline_script_path: str) -> PipelineScr
return visitor


def parse_pipeline_info(visitor: PipelineScriptVisitor) -> Tuple[Optional[str], Optional[str]]:
pipeline_name, pipelines_dir = None, None
def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optional[str]]]:
pipelines: List[Tuple[str, Optional[str]]] = []
if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
pipeline_name, pipelines_dir = None, None
f_r_node = call_args.arguments.get("full_refresh")
if f_r_node:
f_r_value = evaluate_node_literal(f_r_node)
if f_r_value is None:
fmt.warning(f"The value of `full_refresh` in call to `dlt.pipeline` cannot be determined from {unparse(f_r_node).strip()}. We assume that you know what you are doing :)")
if f_r_value is True:
if fmt.confirm("The value of 'full_refresh' is set to True. Do you want to abort to set it to False?", default=True):
return None, None
return pipelines

p_d_node = call_args.arguments.get("pipelines_dir")
if p_d_node:
Expand All @@ -223,8 +238,9 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> Tuple[Optional[str],
pipeline_name = evaluate_node_literal(p_n_node)
if pipeline_name is None:
raise CliCommandException("deploy", f"The value of 'pipeline_name' argument in call to `dlt_pipeline` cannot be determined from {unparse(p_d_node).strip()}. Pipeline working dir will be found. Pass it directly with --pipeline-name option.")
pipelines.append((pipeline_name, pipelines_dir))

return pipeline_name, pipelines_dir
return pipelines


def str_representer(dumper: yaml.Dumper, data: str) -> yaml.ScalarNode:
Expand Down
7 changes: 0 additions & 7 deletions dlt/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@
MODULE_INIT = "__init__.py"


def clone_command_repo(repo_location: str, branch: str) -> FileStorage:
template_dir = tempfile.mkdtemp()
# TODO: handle ImportError (no git command available) gracefully
with git.clone_repo(repo_location, template_dir, branch=branch):
return FileStorage(template_dir)


def parse_init_script(command: str, script_source: str, init_script_name: str) -> PipelineScriptVisitor:
# parse the script first
tree = ast.parse(source=script_source)
Expand Down
47 changes: 33 additions & 14 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Optional, TYPE_CHECKING, Dict, Any
from typing import Optional, Dict, Any

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec
from dlt.common.configuration.specs.exceptions import InvalidBoto3Session
from dlt import version


Expand All @@ -12,22 +13,21 @@ class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
aws_session_token: Optional[TSecretStrValue] = None
aws_profile: Optional[str] = None
profile_name: Optional[str] = None
region_name: Optional[str] = None

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.aws_profile
profile=self.profile_name
)

def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
d = dict(self)
d['profile_name'] = d.pop('aws_profile') # boto3 argument doesn't match env var name
return d
return dict(self)


@configspec
Expand All @@ -36,14 +36,7 @@ class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):
def on_partial(self) -> None:
# Try get default credentials
session = self._to_session()
self.aws_profile = session.profile_name
default = session.get_credentials()
if not default:
return None
self.aws_access_key_id = default.access_key
self.aws_secret_access_key = default.secret_key
self.aws_session_token = default.token
if not self.is_partial():
if self._from_session(session) and not self.is_partial():
self.resolve()

def _to_session(self) -> Any:
Expand All @@ -53,5 +46,31 @@ def _to_session(self) -> Any:
raise MissingDependencyException(self.__class__.__name__, [f"{version.DLT_PKG_NAME}[s3]"])
return boto3.Session(**self.to_native_representation())

def _from_session(self, session: Any) -> Any:
"""Sets the credentials properties from boto3 `session` and return session's credentials if found"""
import boto3
assert isinstance(session, boto3.Session)
self.profile_name = session.profile_name
self.region_name = session.region_name
default = session.get_credentials()
if not default:
return None
self.aws_access_key_id = default.access_key
self.aws_secret_access_key = default.secret_key
self.aws_session_token = default.token
return default

def to_native_credentials(self) -> Optional[Any]:
return self._to_session().get_credentials()

def parse_native_representation(self, native_value: Any) -> None:
"""Import external boto session"""
try:
import boto3
if isinstance(native_value, boto3.Session):
if self._from_session(native_value):
self.__is_resolved__ = True
else:
raise InvalidBoto3Session(self.__class__, native_value)
except ImportError:
pass
6 changes: 6 additions & 0 deletions dlt/common/configuration/specs/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ class InvalidGoogleOauth2Json(NativeValueError):
def __init__(self, spec: Type[Any], native_value: Any):
msg = f"The expected representation for {spec.__name__} is a string with serialized oauth2 user info and may be wrapped in 'install'/'web' node - depending of oauth2 app type."
super().__init__(spec, native_value, msg)


class InvalidBoto3Session(NativeValueError):
def __init__(self, spec: Type[Any], native_value: Any):
msg = f"The expected representation for {spec.__name__} is and instance of boto3.Session containing credentials"
super().__init__(spec, native_value, msg)
29 changes: 23 additions & 6 deletions dlt/common/data_types/type_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import base64
import datetime # noqa: I251
from collections.abc import Mapping as C_Mapping, Sequence as C_Sequence
from typing import Any, Type, Literal, Union, Optional, cast
from typing import Any, Type, Literal, Union, cast

from dlt.common import pendulum, json, Decimal, Wei
from dlt.common.json import custom_pua_remove
from dlt.common.json._simplejson import custom_encode as json_custom_encode
from dlt.common.arithmetics import InvalidOperation
from dlt.common.data_types.typing import TDataType
from dlt.common.time import parse_iso_like_datetime, ensure_datetime, ensure_date
from dlt.common.time import ensure_pendulum_datetime, parse_iso_like_datetime, ensure_pendulum_date
from dlt.common.utils import map_nested_in_place, str2bool


Expand Down Expand Up @@ -57,7 +57,7 @@ def complex_to_str(value: Any) -> str:
return json.dumps(map_nested_in_place(custom_pua_remove, value))


def coerce_date_types(
def coerce_to_date_types(
to_type: Literal["timestamp", "date"], from_type: TDataType, value: Any
) -> Union[datetime.datetime, datetime.date]:
result: Union[datetime.datetime, datetime.date]
Expand Down Expand Up @@ -89,8 +89,25 @@ def coerce_date_types(
raise ValueError(value)
# Pendulum ISO parsing may result in either datetime or date
if to_type == "date":
return ensure_date(result)
return ensure_datetime(result)
return ensure_pendulum_date(result)
return ensure_pendulum_datetime(result)


def coerce_from_date_types(
to_type: TDataType, value: datetime.datetime
) -> Union[datetime.datetime, datetime.date, int, float, str]:
v = ensure_pendulum_datetime(value)
if to_type == "timestamp":
return v
if to_type == "text":
return v.isoformat()
if to_type == "bigint":
return v.int_timestamp # type: ignore
if to_type == "double":
return v.timestamp() # type: ignore
if to_type == "date":
return ensure_pendulum_date(v)
raise TypeError(f"Cannot convert timestamp to {to_type}")


def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any:
Expand Down Expand Up @@ -165,7 +182,7 @@ def coerce_value(to_type: TDataType, from_type: TDataType, value: Any) -> Any:
raise ValueError(trim_value)

if to_type in ["timestamp", "date"]:
return coerce_date_types(cast(Literal["timestamp", "date"], to_type), from_type, value)
return coerce_to_date_types(cast(Literal["timestamp", "date"], to_type), from_type, value)

if to_type == "bool":
if from_type == "text":
Expand Down
61 changes: 47 additions & 14 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime # noqa: I251

from dlt.common.pendulum import pendulum, timedelta
from dlt.common.typing import TimedeltaSeconds
from dlt.common.typing import TimedeltaSeconds, TAnyDateTime
from pendulum.parsing import parse_iso8601, _parse_common as parse_datetime_common
from pendulum.tz import UTC

Expand All @@ -26,7 +26,7 @@ def timestamp_before(timestamp: float, max_inclusive: Optional[float]) -> bool:
return timestamp <= (max_inclusive or FUTURE_TIMESTAMP)


def parse_iso_like_datetime(value: Any) -> pendulum.DateTime:
def parse_iso_like_datetime(value: Any) -> Union[pendulum.DateTime, pendulum.Date]:
# we use internal pendulum parse function. the generic function, for example, parses string "now" as now()
# it also tries to parse ISO intervals but the code is very low quality

Expand Down Expand Up @@ -54,21 +54,29 @@ def parse_iso_like_datetime(value: Any) -> pendulum.DateTime:
return dtv # type: ignore


def ensure_datetime(value: Union[datetime.datetime, datetime.date]) -> datetime.datetime:
"""
Convert `date` to `datetime` if needed
"""
if isinstance(value, datetime.datetime):
return value
return pendulum.datetime(
value.year, value.month, value.day, tz=UTC
)
def ensure_pendulum_date(value: TAnyDateTime) -> pendulum.Date:
"""Coerce a date/time value to a `pendulum.Date` object.
UTC is assumed if the value is not timezone aware. Other timezones are shifted to UTC
def ensure_date(value: Union[datetime.datetime, datetime.date]) -> datetime.date:
Args:
value: The value to coerce. Can be a pendulum.DateTime, pendulum.Date, datetime, date or iso date/time str.
Returns:
A timezone aware pendulum.Date object.
"""
if isinstance(value, datetime.datetime):
return value.date()
return value
# both py datetime and pendulum datetime are handled here
value = pendulum.instance(value)
return value.in_tz(UTC).date() # type: ignore
elif isinstance(value, datetime.date):
return pendulum.date(value.year, value.month, value.day)
elif isinstance(value, str):
result = parse_iso_like_datetime(value)
if isinstance(result, pendulum.DateTime):
return result.in_tz(UTC).date() # type: ignore
return pendulum.datetime(result.year, result.month, result.day)
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")


@overload
Expand All @@ -85,3 +93,28 @@ def to_seconds(td: Optional[TimedeltaSeconds]) -> Optional[float]:
if isinstance(td, timedelta):
return td.total_seconds()
return td


def ensure_pendulum_datetime(value: TAnyDateTime) -> pendulum.DateTime:
"""Coerce a date/time value to a `pendulum.DateTime` object.
UTC is assumed if the value is not timezone aware. Other timezones are shifted to UTC
Args:
value: The value to coerce. Can be a pendulum.DateTime, pendulum.Date, datetime, date or iso date/time str.
Returns:
A timezone aware pendulum.DateTime object in UTC timezone.
"""
if isinstance(value, datetime.datetime):
# both py datetime and pendulum datetime are handled here
ret = pendulum.instance(value)
return ret.in_tz(UTC)
elif isinstance(value, datetime.date):
return pendulum.datetime(value.year, value.month, value.day, tz=UTC)
elif isinstance(value, str):
result = parse_iso_like_datetime(value)
if isinstance(result, pendulum.DateTime):
return result.in_tz(UTC)
return pendulum.datetime(result.year, result.month, result.day, tz=UTC)
raise TypeError(f"Cannot coerce {value} to a pendulum.DateTime object.")
Loading

0 comments on commit 5e903b2

Please sign in to comment.