Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ff and lovac in 2020 #907

Open
wants to merge 4 commits into
base: feat/data-setup-dagster
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytics/dagster/src/assets/notion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ..dlt_sources.notion import notion_databases

@dlt_assets(
dlt_source=notion_databases(database_ids=[{"id": "a57fc47a6e3b4ebd835cf0d7a5460e29"}], api_key="secret_kU0GYOB4NMjni8ObBrrVspQkZTcmlUgZ3YdguzwubBP"),
dlt_source=notion_databases(database_ids=[{"id": "a57fc47a6e3b4ebd835cf0d7a5460e29"}]),
dlt_pipeline=pipeline(
pipeline_name="notion",
dataset_name="notion_dataset",
Expand Down
30 changes: 30 additions & 0 deletions analytics/dagster/src/dlt_sources/.dlt/.sources
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,33 @@ sources:
git_sha: e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
sha3_256: a7ffc6f8bf1ed76651c14756a061d662f580ff4de43b49fa82d80a4b80f8434a
dlt_version_constraint: '>=0.3.5'
filesystem:
is_dirty: false
last_commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
last_commit_timestamp: '2024-09-29T19:35:17+02:00'
files:
filesystem/readers.py:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: bd5f3575c2331d77744b773444a1de9f4efdc97d
sha3_256: db96f60544917574136cb03fcec6fc0c86ffc6d1d553b857038297368df877ec
filesystem/__init__.py:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: c6452543ed36d80bef8c09992dbcae132320c1ca
sha3_256: 9fa787128df36e06c70ab8be402df53c310b2b47e99a840f159fb2f7594d15b8
filesystem/setup_script_gcp_oauth.py:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: a3ae7ac145d388eb4eefd3e7f322502c33936d32
sha3_256: a82f291fafc52b778eafb514155a8dbc583622e72de97440fb5a128924e1bfde
filesystem/README.md:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: 4a57650df7cfcc49e3c10540b3f870390c597a45
sha3_256: 49d90d9d82cf560df8fdc5375dda63c1aa52692d1360e473426f14a2e1a9eaf1
filesystem/settings.py:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: 33fcb55b5f694667d865d498c6a6481cf299b504
sha3_256: cdf7740b7c845ef57bd0f7b65c43e8a12d2c29550f343ae1a5267cafeddaaca4
filesystem/helpers.py:
commit_sha: f413db6463ff7f1412a5ec69009eb7200e58a3c0
git_sha: df67f999ddd4c46e389eba64f8b57ccef83cc285
sha3_256: 82d1ecdd13ad84c642da72abae736493ea040e1b50c458859651bfc4ffad4df8
dlt_version_constraint: <1,>=0.4.3a0
3 changes: 3 additions & 0 deletions analytics/dagster/src/dlt_sources/.dlt/config.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
[sources.filesystem]
bucket_url = "bucket_url" # please set me up!

[runtime]
dlthub_telemetry = true
109 changes: 109 additions & 0 deletions analytics/dagster/src/dlt_sources/filesystem/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Readers Source & Filesystem

This verified source easily streams files from AWS S3, GCS, Azure, or local filesystem using the reader source.

Sources and resources that can be used with this verified source are:


| Name | Type | Description |
|-----------------|----------------------|---------------------------------------------------------------------------|
| readers | Source | Lists and reads files with resource `filesystem` and readers transformers |
| filesystem | Resource | Lists files in `bucket_url` using `file_glob` pattern |
| read_csv | Resource-transformer | Reads CSV file with "Pandas" chunk by chunk |
| read_csv_duckdb | Resource-transformer | Reads CSV file with DuckDB engine chunk by chunk |
| read_jsonl | Resource-transformer | Reads JSONL file content and extracts the data |
| read_parquet | Resource-transformer | Reads Parquet file content and extracts the data with "Pyarrow" |


## Initialize the source

```shell
dlt init filesystem duckdb
```

Here, we chose `duckdb` as the destination. Alternatively, you can also choose `redshift`, `bigquery`, or
any of the other [destinations.](https://dlthub.com/docs/dlt-ecosystem/destinations/)

## Setup verified source

To grab the credentials for AWS S3, Google Cloud Storage, Azure cloud storage and initialize the
pipeline, please refer to the
[full documentation here.](https://dlthub.com/docs/dlt-ecosystem/verified-sources/filesystem)

## Add credentials

1. In the `.dlt` folder, there's a file called `secrets.toml`. It's where you store sensitive
information securely, like access tokens. Keep this file safe. Here's its format for service
account authentication:

```toml
[sources.filesystem.credentials] # use [sources.readers.credentials] for the "readers" source
# For AWS S3 access:
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"

# For GCS storage bucket access:
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"

# For Azure blob storage access:
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"
```

1. Finally, enter credentials for your chosen destination as per the [docs](../destinations/).

1. You can pass the bucket URL and glob pattern or use `config.toml`. For local filesystems, use
`file://` or skip the schema.

```toml
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="~/Documents/csv_files/"
file_glob="*"
```

For remote file systems you need to add the schema, it will be used to get the protocol being
used, for example:

```toml
[sources.filesystem] # use [sources.readers.credentials] for the "readers" source
bucket_url="s3://my-bucket/csv_files/"
```

## Usage

Use `filesystem` as a
[standalone resource](https://dlthub.com/docs/general-usage/resource#declare-a-standalone-resource),
to enumerate S3, GCS, and Azure bucket files.

```python
files = filesystem(bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv")
pipeline.run(files)
```

Use `readers` source to enumerate and **read** chunked
`csv`, `jsonl` and `parquet` bucket files.

```python
files = readers(
bucket_url="s3://my_bucket/data", file_glob="csv_folder/*.csv"
).read_csv()
pipeline.run(files.with_name("table_name"))
```

We advise that you give each resource a specific name (`with_name` in the example above)
before loading with `pipeline.run`.
This will make sure that data goes to a table with the name you
want and that each pipeline uses a
separate state for incremental loading.

> To add a new file reader is straightforward. For demos, see
[filesystem_pipeline.py](../filesystem_pipeline.py). We welcome contributions for any file types,
including PDFs and Excel files.

💡 To explore additional customizations for this pipeline,
we recommend referring to the official `dlt` [Readers Source & Filesystem verified
source](https://dlthub.com/docs/dlt-ecosystem/verified-sources/filesystem) documentation. It provides comprehensive information
and guidance on how to further customize and tailor the
pipeline to suit your specific needs.
99 changes: 99 additions & 0 deletions analytics/dagster/src/dlt_sources/filesystem/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Reads files in s3, gs or azure buckets using fsspec and provides convenience resources for chunked reading of various file formats"""
from typing import Iterator, List, Optional, Tuple, Union

import dlt
from dlt.common.typing import copy_sig
from dlt.sources import DltResource
from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files
from dlt.sources.credentials import FileSystemCredentials

from .helpers import (
AbstractFileSystem,
FilesystemConfigurationResource,
)
from .readers import (
ReadersSource,
_read_csv,
_read_csv_duckdb,
_read_jsonl,
_read_parquet,
)
from .settings import DEFAULT_CHUNK_SIZE


@dlt.source(_impl_cls=ReadersSource, spec=FilesystemConfigurationResource)
def readers(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
) -> Tuple[DltResource, ...]:
"""This source provides a few resources that are chunked file readers. Readers can be further parametrized before use
read_csv(chunksize, **pandas_kwargs)
read_jsonl(chunksize)
read_parquet(chunksize)

Args:
bucket_url (str): The url to the bucket.
credentials (FileSystemCredentials | AbstractFilesystem): The credentials to the filesystem of fsspec `AbstractFilesystem` instance.
file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively
"""
return (
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_csv")(_read_csv),
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_jsonl")(_read_jsonl),
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_parquet")(_read_parquet),
filesystem(bucket_url, credentials, file_glob=file_glob)
| dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb),
)


@dlt.resource(
primary_key="file_url", spec=FilesystemConfigurationResource, standalone=True
)
def filesystem(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
files_per_page: int = DEFAULT_CHUNK_SIZE,
extract_content: bool = False,
) -> Iterator[List[FileItem]]:
"""This resource lists files in `bucket_url` using `file_glob` pattern. The files are yielded as FileItem which also
provide methods to open and read file data. It should be combined with transformers that further process (ie. load files)

Args:
bucket_url (str): The url to the bucket.
credentials (FileSystemCredentials | AbstractFilesystem): The credentials to the filesystem of fsspec `AbstractFilesystem` instance.
file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively
files_per_page (int, optional): The number of files to process at once, defaults to 100.
extract_content (bool, optional): If true, the content of the file will be extracted if
false it will return a fsspec file, defaults to False.

Returns:
Iterator[List[FileItem]]: The list of files.
"""
if isinstance(credentials, AbstractFileSystem):
fs_client = credentials
else:
fs_client = fsspec_filesystem(bucket_url, credentials)[0]

files_chunk: List[FileItem] = []
for file_model in glob_files(fs_client, bucket_url, file_glob):
file_dict = FileItemDict(file_model, credentials)
if extract_content:
file_dict["file_content"] = file_dict.read_bytes()
files_chunk.append(file_dict) # type: ignore

# wait for the chunk to be full
if len(files_chunk) >= files_per_page:
yield files_chunk
files_chunk = []
if files_chunk:
yield files_chunk


read_csv = dlt.transformer(standalone=True)(_read_csv)
read_jsonl = dlt.transformer(standalone=True)(_read_jsonl)
read_parquet = dlt.transformer(standalone=True)(_read_parquet)
read_csv_duckdb = dlt.transformer(standalone=True)(_read_csv_duckdb)
98 changes: 98 additions & 0 deletions analytics/dagster/src/dlt_sources/filesystem/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Helpers for the filesystem resource."""
from typing import Any, Dict, Iterable, List, Optional, Type, Union
from fsspec import AbstractFileSystem # type: ignore

import dlt
from dlt.common.configuration import resolve_type
from dlt.common.typing import TDataItem

from dlt.sources import DltResource
from dlt.sources.filesystem import fsspec_filesystem
from dlt.sources.config import configspec, with_config
from dlt.sources.credentials import (
CredentialsConfiguration,
FilesystemConfiguration,
FileSystemCredentials,
)

from .settings import DEFAULT_CHUNK_SIZE


@configspec
class FilesystemConfigurationResource(FilesystemConfiguration):
credentials: Union[FileSystemCredentials, AbstractFileSystem] = None
file_glob: Optional[str] = "*"
files_per_page: int = DEFAULT_CHUNK_SIZE
extract_content: bool = False

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
# use known credentials or empty credentials for unknown protocol
return Union[self.PROTOCOL_CREDENTIALS.get(self.protocol) or Optional[CredentialsConfiguration], AbstractFileSystem] # type: ignore[return-value]


def fsspec_from_resource(filesystem_instance: DltResource) -> AbstractFileSystem:
"""Extract authorized fsspec client from a filesystem resource"""

@with_config(
spec=FilesystemConfiguration,
sections=("sources", filesystem_instance.section, filesystem_instance.name),
)
def _get_fsspec(
bucket_url: str, credentials: Optional[FileSystemCredentials]
) -> AbstractFileSystem:
return fsspec_filesystem(bucket_url, credentials)[0]

return _get_fsspec(
filesystem_instance.explicit_args.get("bucket_url", dlt.config.value),
filesystem_instance.explicit_args.get("credentials", dlt.secrets.value),
)


def add_columns(columns: List[str], rows: List[List[Any]]) -> List[Dict[str, Any]]:
"""Adds column names to the given rows.

Args:
columns (List[str]): The column names.
rows (List[List[Any]]): The rows.

Returns:
List[Dict[str, Any]]: The rows with column names.
"""
result = []
for row in rows:
result.append(dict(zip(columns, row)))

return result


def fetch_arrow(file_data, chunk_size: int) -> Iterable[TDataItem]: # type: ignore
"""Fetches data from the given CSV file.

Args:
file_data (DuckDBPyRelation): The CSV file data.
chunk_size (int): The number of rows to read at once.

Yields:
Iterable[TDataItem]: Data items, read from the given CSV file.
"""
batcher = file_data.fetch_arrow_reader(batch_size=chunk_size)
yield from batcher


def fetch_json(file_data, chunk_size: int) -> List[Dict[str, Any]]: # type: ignore
"""Fetches data from the given CSV file.

Args:
file_data (DuckDBPyRelation): The CSV file data.
chunk_size (int): The number of rows to read at once.

Yields:
Iterable[TDataItem]: Data items, read from the given CSV file.
"""
while True:
batch = file_data.fetchmany(chunk_size)
if not batch:
break

yield add_columns(file_data.columns, batch)
Loading
Loading