Skip to content

Commit

Permalink
375 filesystem storage staging (#451)
Browse files Browse the repository at this point in the history
* add creation of reference followup jobs

* copy job

* add parquet format

* make staging destination none by default

* add automatic resolving of correct file format

* renaming staging destination to staging

* refactor redshift job and inject fs config info

* small cleanup and full parquet file loading test

* add redshift test

* fix resuming existing jobs

* linter fixes and something else i forgot

* move reference job follow up creation into job

* add bigquery staging with gcs

* add jsonl loading for bigquery staging

* better supported file format resolution

* move to existing bigquery load job

* change pipeline args order

* add staging run arg

* some more pipeline fixes

* configure staging via config

* enhance staging load tests

* fix merge disposition on redshift

* add comprehensive staging tests

* fix redshift jsonl loading

* add doc page (not in hierarchy for now)

* move redshift credentials testing to redshift loading location

* change rotation test

* change timing test

* implement snowflake file staging

* switch to staging instead of integration

* add s3 stage (which does not currently work)

* filter out certain combinations for tests

* update docs for snowflake staging

* forward staging config to supported destination configuration types

* move boto out of staging credentials

* authentication to s3 with iam role for redshift

* verify support for snowflake s3 stage and update docs for this

* adds named put stage to snowflake, improves exception messages, fixes and re-enables staging tests

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
sh-rp and rudolfix authored Jul 15, 2023
1 parent dcee9b7 commit fb9dabf
Show file tree
Hide file tree
Showing 61 changed files with 1,259 additions and 317 deletions.
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .gcp_credentials import GcpServiceAccountCredentialsWithoutDefaults, GcpServiceAccountCredentials, GcpOAuthCredentialsWithoutDefaults, GcpOAuthCredentials, GcpCredentials # noqa: F401
from .connection_string_credentials import ConnectionStringCredentials # noqa: F401
from .api_credentials import OAuth2Credentials # noqa: F401
from .aws_credentials import AwsCredentials # noqa: F401
from .aws_credentials import AwsCredentials, AwsCredentialsWithoutDefaults # noqa: F401


# backward compatibility for service account credentials
Expand Down
47 changes: 24 additions & 23 deletions dlt/common/configuration/specs/aws_credentials.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
from typing import Optional, TYPE_CHECKING, Dict
from typing import Optional, TYPE_CHECKING, Dict, Any

from dlt.common.exceptions import MissingDependencyException
from dlt.common.typing import TSecretStrValue
from dlt.common.configuration.specs import CredentialsConfiguration, CredentialsWithDefault, configspec
from dlt import version

if TYPE_CHECKING:
from botocore.credentials import Credentials
from boto3 import Session


@configspec
class AwsCredentials(CredentialsConfiguration, CredentialsWithDefault):
class AwsCredentialsWithoutDefaults(CredentialsConfiguration):
# credentials without boto implementation
aws_access_key_id: str = None
aws_secret_access_key: TSecretStrValue = None
aws_session_token: Optional[TSecretStrValue] = None
aws_profile: Optional[str] = None

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.aws_profile
)

def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
d = dict(self)
d['profile_name'] = d.pop('aws_profile') # boto3 argument doesn't match env var name
return d


@configspec
class AwsCredentials(AwsCredentialsWithoutDefaults, CredentialsWithDefault):

def on_partial(self) -> None:
# Try get default credentials
session = self._to_session()
Expand All @@ -30,27 +46,12 @@ def on_partial(self) -> None:
if not self.is_partial():
self.resolve()

def _to_session(self) -> "Session":
def _to_session(self) -> Any:
try:
import boto3
except ImportError:
raise MissingDependencyException(self.__class__.__name__, [f"{version.DLT_PKG_NAME}[s3]"])
return boto3.Session(**self.to_native_representation())

def to_native_credentials(self) -> Optional["Credentials"]:
def to_native_credentials(self) -> Optional[Any]:
return self._to_session().get_credentials()

def to_s3fs_credentials(self) -> Dict[str, Optional[str]]:
"""Dict of keyword arguments that can be passed to s3fs"""
return dict(
key=self.aws_access_key_id,
secret=self.aws_secret_access_key,
token=self.aws_session_token,
profile=self.aws_profile
)

def to_native_representation(self) -> Dict[str, Optional[str]]:
"""Return a dict that can be passed as kwargs to boto3 session"""
d = dict(self)
d['profile_name'] = d.pop('aws_profile') # boto3 argument doesn't match env var name
return d
4 changes: 1 addition & 3 deletions dlt/common/configuration/specs/config_providers_context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@


import contextlib
import io
from typing import Any, List
from typing import List
from dlt.common.configuration.exceptions import DuplicateConfigProviderException
from dlt.common.configuration.providers import ConfigProvider, EnvironProvider, ContextProvider, SecretsTomlProvider, ConfigTomlProvider, GoogleSecretsProvider
from dlt.common.configuration.specs.base_configuration import ContainerInjectableContext
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
from dlt.common.libs.pyarrow import pyarrow, get_py_arrow_datatype

# build schema
self.schema = pyarrow.schema([pyarrow.field(name, get_py_arrow_datatype(schema_item["data_type"]), nullable=schema_item["nullable"]) for name, schema_item in columns_schema.items()])
self.schema = pyarrow.schema(
[pyarrow.field(name, get_py_arrow_datatype(schema_item["data_type"], self._caps), nullable=schema_item["nullable"]) for name, schema_item in columns_schema.items()]
)
# find row items that are of the complex type (could be abstracted out for use in other writers?)
self.complex_indices = [i for i, field in columns_schema.items() if field["data_type"] == "complex"]
self.writer = pyarrow.parquet.ParquetWriter(self._f, self.schema, flavor=self.parquet_flavor, version=self.parquet_version, data_page_size=self.parquet_data_page_size)
Expand Down
19 changes: 17 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
from typing import Any, Callable, ClassVar, List, Literal
from typing import Any, Callable, ClassVar, List, Literal, Optional, Tuple, Set, get_args

from dlt.common.configuration.utils import serialize_value
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import ContainerInjectableContext
from dlt.common.utils import identity

from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE

from dlt.common.wei import EVM_DECIMAL_PRECISION

# known loader file formats
# jsonl - new line separated json documents
# puae-jsonl - internal extract -> normalize format bases on jsonl
# insert_values - insert SQL statements
# sql - any sql statement
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet"]
TLoaderFileFormat = Literal["jsonl", "puae-jsonl", "insert_values", "sql", "parquet", "reference"]
# file formats used internally by dlt
INTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = {"puae-jsonl", "sql", "reference"}
# file formats that may be chosen by the user
EXTERNAL_LOADER_FILE_FORMATS: Set[TLoaderFileFormat] = set(get_args(TLoaderFileFormat)) - INTERNAL_LOADER_FILE_FORMATS


@configspec(init=True)
class DestinationCapabilitiesContext(ContainerInjectableContext):
"""Injectable destination capabilities required for many Pipeline stages ie. normalize"""
preferred_loader_file_format: TLoaderFileFormat
supported_loader_file_formats: List[TLoaderFileFormat]
preferred_staging_file_format: Optional[TLoaderFileFormat]
supported_staging_file_formats: List[TLoaderFileFormat]
escape_identifier: Callable[[str], str]
escape_literal: Callable[[Any], Any]
decimal_precision: Tuple[int, int]
wei_precision: Tuple[int, int]
max_identifier_length: int
max_column_identifier_length: int
max_query_length: int
Expand All @@ -39,8 +50,12 @@ def generic_capabilities(preferred_loader_file_format: TLoaderFileFormat = None)
caps = DestinationCapabilitiesContext()
caps.preferred_loader_file_format = preferred_loader_file_format
caps.supported_loader_file_formats = ["jsonl", "insert_values", "parquet"]
caps.preferred_staging_file_format = None
caps.supported_staging_file_formats = []
caps.escape_identifier = identity
caps.escape_literal = serialize_value
caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (EVM_DECIMAL_PRECISION, 0)
caps.max_identifier_length = 65536
caps.max_column_identifier_length = 65536
caps.max_query_length = 32 * 1024 * 1024
Expand Down
27 changes: 23 additions & 4 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from importlib import import_module
from types import TracebackType, ModuleType
from typing import ClassVar, Final, Optional, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast
from typing import ClassVar, Final, Optional, Literal, Sequence, Iterable, Type, Protocol, Union, TYPE_CHECKING, cast, List

from dlt.common import logger
from dlt.common.exceptions import IdentifierTooLongException, InvalidDestinationReference, UnknownDestinationModule
Expand All @@ -15,6 +15,7 @@
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


@configspec(init=True)
Expand All @@ -27,7 +28,8 @@ def __str__(self) -> str:
return str(self.credentials)

if TYPE_CHECKING:
def __init__(self, destination_name: str = None, credentials: Optional[CredentialsConfiguration] = None) -> None:
def __init__(self, destination_name: str = None, credentials: Optional[CredentialsConfiguration] = None
) -> None:
...


Expand All @@ -38,17 +40,33 @@ class DestinationClientDwhConfiguration(DestinationClientConfiguration):
"""dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix"""
default_schema_name: Optional[str] = None
"""name of default schema to be used to name effective dataset to load data to"""
staging_credentials: Optional[CredentialsConfiguration] = None

if TYPE_CHECKING:
def __init__(
self,
destination_name: str = None,
credentials: Optional[CredentialsConfiguration] = None,
dataset_name: str = None,
default_schema_name: Optional[str] = None
default_schema_name: Optional[str] = None,
staging_credentials: Optional[CredentialsConfiguration] = None
) -> None:
...

@configspec(init=True)
class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration):
as_staging: bool = False

if TYPE_CHECKING:
def __init__(
self,
destination_name: str = None,
credentials: Union[AwsCredentialsWithoutDefaults, GcpCredentials] = None,
dataset_name: str = None,
default_schema_name: Optional[str] = None,
as_staging: bool = False,
) -> None:
...

TLoadJobState = Literal["running", "failed", "retry", "completed"]

Expand Down Expand Up @@ -106,7 +124,8 @@ def new_file_path(self) -> str:

class FollowupJob:
"""Adds a trait that allows to create a followup job"""
pass
def create_followup_jobs(self, next_state: str) -> List[NewLoadJob]:
return []


class JobClientBase(ABC):
Expand Down
35 changes: 32 additions & 3 deletions dlt/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, AnyStr, Sequence, Optional
from typing import Any, AnyStr, List, Sequence, Optional, Iterable


class DltException(Exception):
Expand Down Expand Up @@ -119,6 +119,35 @@ class DestinationTransientException(DestinationException, TransientException):
pass


class DestinationLoadingViaStagingNotSupported(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} does not support loading via staging.")


class DestinationNoStagingMode(DestinationTerminalException):
def __init__(self, destination: str) -> None:
self.destination = destination
super().__init__(f"Destination {destination} cannot be used as a staging")


class DestinationIncompatibleLoaderFileFormatException(DestinationTerminalException):
def __init__(self, destination: str, staging: str, file_format: str, supported_formats: Iterable[str]) -> None:
self.destination = destination
self.staging = staging
self.file_format = file_format
self.supported_formats = supported_formats
supported_formats_str = ", ".join(supported_formats)
if self.staging:
if not supported_formats:
msg = f"Staging {staging} cannot be used with destination {destination} because they have no file formats in common."
else:
msg = f"Unsupported file format {file_format} for destination {destination} in combination with staging destination {staging}. Supported formats: {supported_formats_str}"
else:
msg = f"Unsupported file format {file_format} destination {destination}. Supported formats: {supported_formats_str}. Check the staging option in the dlt.pipeline for additional formats."
super().__init__(msg)


class IdentifierTooLongException(DestinationTerminalException):
def __init__(self, destination_name: str, identifier_type: str, identifier_name: str, max_identifier_length: int) -> None:
self.destination_name = destination_name
Expand All @@ -129,13 +158,13 @@ def __init__(self, destination_name: str, identifier_type: str, identifier_name:


class DestinationHasFailedJobs(DestinationTerminalException):
def __init__(self, destination_name: str, load_id: str) -> None:
def __init__(self, destination_name: str, load_id: str, failed_jobs: List[Any]) -> None:
self.destination_name = destination_name
self.load_id = load_id
self.failed_jobs = failed_jobs
super().__init__(f"Destination {destination_name} has failed jobs in load package {load_id}")



class PipelineException(DltException):
def __init__(self, pipeline_name: str, msg: str) -> None:
"""Base class for all pipeline exceptions. Should not be raised."""
Expand Down
22 changes: 17 additions & 5 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dlt.common.exceptions import MissingDependencyException
from typing import Any
from typing import Any, Tuple

from dlt.common.destination.capabilities import DestinationCapabilitiesContext

try:
import pyarrow
Expand All @@ -8,7 +10,7 @@
raise MissingDependencyException("DLT parquet Helpers", ["parquet"], "DLT Helpers for for parquet.")


def get_py_arrow_datatype(column_type: str) -> Any:
def get_py_arrow_datatype(column_type: str, caps: DestinationCapabilitiesContext) -> Any:
if column_type == "text":
return pyarrow.string()
elif column_type == "double":
Expand All @@ -22,12 +24,22 @@ def get_py_arrow_datatype(column_type: str) -> Any:
elif column_type == "binary":
return pyarrow.binary()
elif column_type == "complex":
# return pyarrow.struct([pyarrow.field('json', pyarrow.string())])
return pyarrow.string()
elif column_type == "decimal":
return pyarrow.decimal128(38, 18)
return get_py_arrow_numeric(caps.decimal_precision)
elif column_type == "wei":
return pyarrow.decimal128(38, 0)
return get_py_arrow_numeric(caps.wei_precision)
elif column_type == "date":
return pyarrow.date32()
else:
raise ValueError(column_type)
raise ValueError(column_type)


def get_py_arrow_numeric(precision: Tuple[int, int]) -> Any:
if precision[0] <= 38:
return pyarrow.decimal128(*precision)
if precision[0] <= 76:
return pyarrow.decimal256(*precision)
# for higher precision use max precision and trim scale to leave the most significant part
return pyarrow.decimal256(76, max(0, 76 - (precision[0] - precision[1])))
11 changes: 9 additions & 2 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class LoadInfo(NamedTuple):
pipeline: "SupportsPipeline"
destination_name: str
destination_displayable_credentials: str
staging_name: str
staging_displayable_credentials: str
dataset_name: str
loads_ids: List[str]
"""ids of the loaded packages"""
Expand All @@ -79,6 +81,9 @@ def asstr(self, verbosity: int = 0) -> str:
else:
msg += "---"
msg += f"\n{len(self.loads_ids)} load package(s) were loaded to destination {self.destination_name} and into dataset {self.dataset_name}\n"
if self.staging_name:
msg += f"The {self.staging_name} staging destination used {self.staging_displayable_credentials} location to stage data\n"

msg += f"The {self.destination_name} destination used {self.destination_displayable_credentials} location to store data"
for load_package in self.load_packages:
cstr = load_package.state.upper() if load_package.completed_at else "NOT COMPLETED"
Expand Down Expand Up @@ -106,8 +111,9 @@ def has_failed_jobs(self) -> bool:
def raise_on_failed_jobs(self) -> None:
"""Raises `DestinationHasFailedJobs` exception if any of the load packages has a failed job."""
for load_package in self.load_packages:
if len(load_package.jobs["failed_jobs"]):
raise DestinationHasFailedJobs(self.destination_name, load_package.load_id)
failed_jobs = load_package.jobs["failed_jobs"]
if len(failed_jobs):
raise DestinationHasFailedJobs(self.destination_name, load_package.load_id, failed_jobs)

def __str__(self) -> str:
return self.asstr(verbosity=1)
Expand All @@ -128,6 +134,7 @@ class TPipelineState(TypedDict, total=False):
schema_names: Optional[List[str]]
"""All the schemas present within the pipeline working directory"""
destination: Optional[str]
staging: Optional[str]

# properties starting with _ are not automatically applied to pipeline object when state is restored
_state_version: int
Expand Down
5 changes: 5 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,11 @@ def get_write_disposition(tables: TSchemaTables, table_name: str) -> TWriteDispo
raise ValueError(f"No write disposition found in the chain of tables for '{table_name}'.")


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
"""Checks if `table` schema contains column with type _typ"""
return any(c["data_type"] == _typ for c in table["columns"].values())


def get_top_level_table(tables: TSchemaTables, table_name: str) -> TTableSchema:
"""Finds top level (without parent) of a `table_name` following the ancestry hierarchy."""
table = tables[table_name]
Expand Down
Loading

0 comments on commit fb9dabf

Please sign in to comment.