Skip to content

Commit

Permalink
docs: Add Dagster custom assets tutorial (#1728)
Browse files Browse the repository at this point in the history
* Also creates a generic snippet for tracking deployments
* Deprecate Airbyte/Cloudquery docs
* Update Next steps across `integrate` guides
  • Loading branch information
ryscheng authored Jun 28, 2024
1 parent ae9cf44 commit 335d532
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 30 deletions.
41 changes: 25 additions & 16 deletions apps/docs/docs/contribute/connect-data/airbyte.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,28 @@ sidebar_position: 7
sidebar_class_name: hidden
---

:::warning
While Dagster does have support for integrating with
[Airbyte](https://docs.dagster.io/integrations/airbyte),
we do not currently run an Airbyte server.
We prefer using Dagster
[embedded-elt](https://docs.dagster.io/_apidocs/libraries/dagster-embedded-elt).
If you have tips on how to run Airbyte plugins within
embedded-elt, please send us a PR!
:::

## Replicating external databases

If your data exists in an off-the-shelf database,
you can replicate data to OSO via an AirByte Connector or
Singer.io Tap integration through Meltano. This section provides the details
Singer.io Tap integration through Meltano, which is run in a GitHub action.
This configuration does not require running an Airbyte server.

This section provides the details
necessary to add a connector or a tap from an existing Postgres database into
our system. Other databases or datasources should be similar.

### Settings up your postgres database for connection
### Settings up your Postgres database for connection

We will setup the postgre connection to use Change Data Capture which is
suggested for very large databases. You will need to have the following in order
Expand Down Expand Up @@ -107,10 +120,10 @@ CREATE PUBLICATION oso_publication FOR ALL TABLES;

For more details about this command see: https://www.postgresql.org/docs/current/sql-createpublication.html

### Adding your postgres replication data to the OSO meltano configuration
### Adding your postgres replication data to the OSO Meltano configuration

Assuming that you've created the publication you're now ready to connect your
postgres data source to OSO.
In the future we will move everything over to our Dagster setup.
For now, we will run Airbyte plugins via Meltano in a GitHub actions workflow.

#### Add the extractor to `meltano.yml`

Expand Down Expand Up @@ -149,26 +162,22 @@ extractors:
For now, once this is all completed it is best to open a pull request and an OSO
maintainer will reach out with a method to accept the read only credentials.
### Adding to Dagster
:::warning
Coming soon... This section is a work in progress.
To track progress, see this
[GitHub issue](https://github.com/opensource-observer/oso/issues/1318)
:::
## Writing a new Airbyte connector
Airbyte provides one of the best ways to write data connectors
that ingest data from HTTP APIs and other Python sources via the
[Airbyte Python CDK](https://docs.airbyte.com/connector-development/cdk-python/).
:::warning
Coming soon... This section is a work in progress.
This section is a work in progress.
:::
## Airbyte examples in OSO
## Adding to Dagster
:::warning
Coming soon... This section is a work in progress.
This section is a work in progress.
We would love to see a way to integrate Airbyte plugins
into a Dagster asset (possibly via Meltano).
To track progress, see this
[GitHub issue](https://github.com/opensource-observer/oso/issues/1318).
:::
24 changes: 14 additions & 10 deletions apps/docs/docs/contribute/connect-data/cloudquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ sidebar_position: 8
sidebar_class_name: hidden
---

:::warning
While it is possible to run CloudQuery plugins
[inside Dagster embedded-elt](https://www.cloudquery.io/blog/how-to-run-efficient-embedded-elt-cloudquery-workflows-inside-dagster),
we haven't done it ourselves yet.
If you have tips on how to get this going with
embedded-elt, please send us a PR!
:::

[CloudQuery](https://cloudquery.io) can be used to integrate external data sources
into the OSO platform. At this time we are limiting the
CloudQuery plugins in the OSO repository to Python or Typescript.
Expand Down Expand Up @@ -262,14 +270,10 @@ pipeline.
### Adding to Dagster
:::warning
Coming soon... This section is a work in progress.
To track progress, see this
[GitHub issue](https://github.com/opensource-observer/oso/issues/1325)
This section is a work in progress.
We are in the process of moving everything to Dagster embedded-elt.
If you want to help, check out this
[blog](https://www.cloudquery.io/blog/how-to-run-efficient-embedded-elt-cloudquery-workflows-inside-dagster)
and track in this
[GitHub issue](https://github.com/opensource-observer/oso/issues/1325).
:::
## CloudQuery examples in OSO
Here are a few examples of CloudQuery plugins currently in use:
- [Importing oss-directory](https://github.com/opensource-observer/oso/tree/main/warehouse/cloudquery-oss-directory)
- [Fetch GitHub data missing from GHArchive](https://github.com/opensource-observer/oso/tree/main/warehouse/cloudquery-github-resolve-repos)
99 changes: 99 additions & 0 deletions apps/docs/docs/contribute/connect-data/dagster-config.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
---
title: Dagster Next Steps
sidebar_class_name: hidden
---

## Add the asset to the OSO Dagster configuration

### Submit a pull request

When you are ready to deploy,
submit a pull request of your changes to
[OSO](https://github.com/opensource-observer/oso).
OSO maintainers will work with you to get the code In shape for merging.
For more details on contributing to OSO, check out
[CONTRIBUTING.md](https://github.com/opensource-observer/oso/blob/main/CONTRIBUTING.md).

### Verify deployment

Give our Dagster deployment 24 hours after your pull request merges to reload.
You should be able to find your new asset
in the [global asset list](https://dagster.opensource.observer/assets).

![Dagster assets](./dagster_assets.png)

If your asset is missing, you can check for loading errors
and the date of last code load in the
[Deployment tab](https://dagster.opensource.observer/locations).
For example, if your code has a bug and leads to a loading error,
it may look like this:

![Dagster deployment](./dagster_deployments.png)

### Run it!

If this is your first time adding an asset,
we suggest reaching out to the OSO team over
[Discord](https://www.opensource.observer/discord)
to run deploys manually.
You can monitor all Dagster runs
[here](https://dagster.opensource.observer/runs).

![Dagster run example](./dagster_run.png)

Dagster also provides
[automation](https://docs.dagster.io/concepts/automation)
to run jobs on a
[schedule](https://docs.dagster.io/concepts/automation/schedules)
(e.g. daily), after detecting a condition using a Python-defined
[sensor](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors)
(e.g. when a file appears in GCS),
and using
[auto-materialization policies](https://docs.dagster.io/concepts/assets/asset-auto-execution).

We welcome any automation that can reduce the operational burden
in our continuous deployment.
However, before merging any job automation,
please reach out to the OSO devops team
on [Discord](https://www.opensource.observer/discord)
with an estimate of costs, especially if it involves large BigQuery scans.
We will reject or disable any jobs that lead to
increased infrastructure instability or unexpected costs.

## Defining a dbt source

In this example, we create a dbt source in `oso/warehouse/dbt/models/`
(see [source](https://github.com/opensource-observer/oso/blob/main/warehouse/dbt/models/ethereum_sources.yml))
for the Ethereum mainnet
[public dataset](https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-public-dataset-smart-contract-analytics).

```yaml
sources:
- name: ethereum
database: bigquery-public-data
schema: crypto_ethereum
tables:
- name: transactions
identifier: transactions
- name: traces
identifier: traces
```
This will make it clear to OSO dbt users that the data is available for use.
We can then reference these tables in downstream models with
the `source` macro:

```sql
select
block_timestamp,
`hash` as transaction_hash,
from_address,
receipt_contract_address
from {{ source("ethereum", "transactions") }}
```

## Next steps

- [**SQL Query Guide**](../../integrate/query-data.mdx): run queries against the data you just loaded
- [**Connect OSO to 3rd Party tools**](../../integrate/3rd-party.mdx): explore your data using tools like Hex.tech, Tableau, and Metabase
- [**Write a dbt model**](../impact-models.md): contribute new impact and data models to our data pipeline
154 changes: 153 additions & 1 deletion apps/docs/docs/contribute/connect-data/dagster.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,156 @@
---
title: 🏗️ Custom Dagster Assets
title: Writing Custom Dagster Assets
sidebar_position: 5
---

import NextSteps from "./dagster-config.mdx"

Before writing a fully custom Dagster asset,
we recommend you first see if the previous guides on
[BigQuery datasets](./bigquery.md),
[database replication](./database.md),
[API crawling](./api.md)
may be a better fit.
This guide should only be used in the rare cases where you cannot
use the other methods.

## Write your Dagster asset

Dagster provides a great
[tutorial](https://docs.dagster.io/integrations/bigquery/using-bigquery-with-dagster)
on writing Dagster assets that write to BigQuery.

At a high-level, there are 2 possible pathways:

- [**Option 1**: Using the BigQuery resource](https://docs.dagster.io/integrations/bigquery/using-bigquery-with-dagster#option-1-using-the-bigquery-resource)
By using the BigQuery resource directly, you'll execute SQL queries directly on BigQuery
to create and interact with tables. This is ideal for large datasets where you do not
want the Dagster process to be in the critical path of dataflow.
Rather, Dagster orchestrates jobs remotely out-of-band.

- [**Option 2**: BigQuery I/O Manager](https://docs.dagster.io/integrations/bigquery/using-bigquery-with-dagster#option-2-using-the-bigquery-io-manager)
This method offers significantly more control, by loading data into a DataFrame
on the Dagster process for arbitrary computation.
However because Dagster is now on the critical path computing on data,
it can lead to performance issues, especially if the data does not
easily fit in memory.

Assets should be added to
`warehouse/oso_dagster/assets/`.
All assets defined in this directory are automatically
loaded into Dagster.

For an example of a custom Dagster asset, check out the
[asset for oss-directory](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets/ossd.py),
where we use the `oss-directory` Python package
to fetch data from the
[oss-directory repo](https://github.com/opensource-observer/oss-directory/)
and load it into BigQuery using the I/O manager approach.

## Creating an asset factory (optional)

If your asset represents a common pattern,
we encourage that you turn this into a factory.
For example, our
[tutorial on Google Cloud Storage-based assets](./gcs.md)
is an example of a factory pattern.

:::tip
We suggest creating a vanilla asset and see it work in production
before abstracting it out into a factory
:::

In order to create a factory, add it to the
`warehouse/oso_dagster/factories/` directory.
The factory needs to return a `AssetFactoryResponse`
that represents the different aspects of an asset,
including sensors, jobs, and checks.

```python
# warehouse/oso_dagster/factories/common.py
@dataclass
class AssetFactoryResponse:
assets: List[AssetsDefinition]
sensors: List[SensorDefinition] = field(default_factory=lambda: [])
jobs: List[JobDefinition] = field(default_factory=lambda: [])
checks: List[AssetChecksDefinition] = field(default_factory=lambda: [])
```

For example, below we share the method signature
of the GCS factory:

```python
# warehouse/oso_dagster/factories/gcs.py

## Factory configuration
@dataclass(kw_only=True)
class IntervalGCSAsset(BaseGCSAsset):
interval: Interval
mode: SourceMode
retention_days: int

## Factory function
def interval_gcs_import_asset(config: IntervalGCSAsset):

# Asset definition
@asset(name=config.name, key_prefix=config.key_prefix, **config.asset_kwargs)
def gcs_asset(
context: AssetExecutionContext, bigquery: BigQueryResource, gcs: GCSResource
) -> MaterializeResult:
# Load GCS files into BigQuery
...

# Asset sensor definition
@asset_sensor(
asset_key=gcs_asset.key,
name=f"{config.name}_clean_up_sensor",
job=gcs_clean_up_job,
default_status=DefaultSensorStatus.STOPPED,
)
def gcs_clean_up_sensor(
context: SensorEvaluationContext, gcs: GCSResource, asset_event: EventLogEntry
):
# Detect when we can cleanup old data files on GCS
...

# Asset cleanup job
@job(name=f"{config.name}_clean_up_job")
def gcs_clean_up_job():
# Delete old files on GCS
...

# Return an instance
return AssetFactoryResponse([gcs_asset], [gcs_clean_up_sensor], [gcs_clean_up_job])
```

Then you can instantiate the asset with the factory
in `warehouse/oso_dagster/assets/`.
For example, we get periodic Parquet file dumps
from Gitcoin Passport into the GCS bucket named
`oso-dataset-transfer-bucket`.
Using the GCS factory,
we load the data into the BigQuery table named
`gitcoin.passport_scores`.

```python
# warehouse/oso_dagster/assets/gitcoin.py
gitcoin_passport_scores = interval_gcs_import_asset(
IntervalGCSAsset(
key_prefix="gitcoin",
name="passport_scores",
project_id="opensource-observer",
bucket_name="oso-dataset-transfer-bucket",
path_base="passport",
file_match=r"(?P<interval_timestamp>\d\d\d\d-\d\d-\d\d)/scores.parquet",
destination_table="passport_scores",
raw_dataset_name="oso_raw_sources",
clean_dataset_name="gitcoin",
interval=Interval.Daily,
mode=SourceMode.Overwrite,
retention_days=10,
format="PARQUET",
),
)
```

<NextSteps components={props.components}/>
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions apps/docs/docs/contribute/connect-data/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ title: 🏗️ Connect via Google Cloud Storage (GCS)
sidebar_position: 4
---

import NextSteps from "./dagster-config.mdx"

Depending on the data, we may accept data dumps
into our Google Cloud Storage (GCS).
If you believe your data storage qualifies to be sponsored
Expand Down Expand Up @@ -35,3 +37,5 @@ see the [Dagster tutorial](https://docs.dagster.io/tutorial).
- [Superchain data](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py)
- [Gitcoin Passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py)
- [OpenRank reputations on Farcaster](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py)

<NextSteps components={props.components}/>
4 changes: 2 additions & 2 deletions apps/docs/docs/contribute/connect-data/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ in order of preference:
4. [**Files into Google Cloud Storage (GCS)**](./gcs.md): You can drop Parquet/CSV files in our GCS bucket for loading into BigQuery.
5. [**Custom Dagster assets**](./dagster.md): Write a custom Dagster asset for other unique data sources.
6. **Static files**: If the data is high quality and can only be imported via static files, please reach out to us on [Discord](https://www.opensource.observer/discord) to coordinate hand-off. This path is predominantly used for [grant funding data](./funding-data.md).
7. (deprecated) [Airbyte plugins](./airbyte.md): Airbyte plugins are the preferred method for crawling APIs.
8. (deprecated) [CloudQuery plugins](./cloudquery.md): CloudQuery offers another, more flexible avenue for writing data import plugins.
7. (deprecated) [Airbyte](./airbyte.md): a modern ELT tool
8. (deprecated) [CloudQuery](./cloudquery.md): a modern ELT tool

We generally prefer to work with data partners that can help us regularly
index live data that can feed our daily data pipeline.
Expand Down
Loading

0 comments on commit 335d532

Please sign in to comment.