From dd51851ca0b97892b4f0528f029924c2b26290df Mon Sep 17 00:00:00 2001 From: Raymond Cheng Date: Sun, 30 Jun 2024 15:26:16 -0700 Subject: [PATCH] docs: GCS factory assets (#1732) * Added a guide for adding GCS-based assets * Added partial comments for how to configure the factory * Removing a.out and boop.txt --- a.out | 0 .../connect-data/dagster-config.mdx | 13 +-- .../docs/contribute/connect-data/dagster.md | 7 +- apps/docs/docs/contribute/connect-data/gcs.md | 80 ++++++++++++++++--- boop.txt | 40 ---------- warehouse/oso_dagster/factories/gcs.py | 16 +++- 6 files changed, 93 insertions(+), 63 deletions(-) delete mode 100644 a.out delete mode 100644 boop.txt diff --git a/a.out b/a.out deleted file mode 100644 index e69de29bb..000000000 diff --git a/apps/docs/docs/contribute/connect-data/dagster-config.mdx b/apps/docs/docs/contribute/connect-data/dagster-config.mdx index 6ec1960ef..4049b4fb0 100644 --- a/apps/docs/docs/contribute/connect-data/dagster-config.mdx +++ b/apps/docs/docs/contribute/connect-data/dagster-config.mdx @@ -10,13 +10,14 @@ sidebar_class_name: hidden When you are ready to deploy, submit a pull request of your changes to [OSO](https://github.com/opensource-observer/oso). -OSO maintainers will work with you to get the code In shape for merging. +OSO maintainers will work with you to get the code in shape for merging. For more details on contributing to OSO, check out [CONTRIBUTING.md](https://github.com/opensource-observer/oso/blob/main/CONTRIBUTING.md). ### Verify deployment -Give our Dagster deployment 24 hours after your pull request merges to reload. +Our Dagster deployment should automatically recognize the asset +after merging your pull request to the main branch. You should be able to find your new asset in the [global asset list](https://dagster.opensource.observer/assets). @@ -30,6 +31,7 @@ it may look like this: ![Dagster deployment](./dagster_deployments.png) + ### Run it! If this is your first time adding an asset, @@ -62,7 +64,9 @@ increased infrastructure instability or unexpected costs. ## Defining a dbt source -In this example, we create a dbt source in `oso/warehouse/dbt/models/` +In order to make the new dataset available to the data pipeline, +you need to add it as a dbt source. +In this example, we create a source in `oso/warehouse/dbt/models/` (see [source](https://github.com/opensource-observer/oso/blob/main/warehouse/dbt/models/ethereum_sources.yml)) for the Ethereum mainnet [public dataset](https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-public-dataset-smart-contract-analytics). @@ -79,7 +83,6 @@ sources: identifier: traces ``` -This will make it clear to OSO dbt users that the data is available for use. We can then reference these tables in downstream models with the `source` macro: @@ -96,4 +99,4 @@ from {{ source("ethereum", "transactions") }} - [**SQL Query Guide**](../../integrate/query-data.mdx): run queries against the data you just loaded - [**Connect OSO to 3rd Party tools**](../../integrate/3rd-party.mdx): explore your data using tools like Hex.tech, Tableau, and Metabase -- [**Write a dbt model**](../impact-models.md): contribute new impact and data models to our data pipeline \ No newline at end of file +- [**Write a dbt model**](../impact-models.md): contribute new impact and data models to our data pipeline diff --git a/apps/docs/docs/contribute/connect-data/dagster.md b/apps/docs/docs/contribute/connect-data/dagster.md index 1f21c9ce3..9d875715d 100644 --- a/apps/docs/docs/contribute/connect-data/dagster.md +++ b/apps/docs/docs/contribute/connect-data/dagster.md @@ -36,9 +36,10 @@ At a high-level, there are 2 possible pathways: easily fit in memory. Assets should be added to -`warehouse/oso_dagster/assets/`. -All assets defined in this directory are automatically -loaded into Dagster. +`warehouse/oso_dagster/assets/` and then imported in +`warehouse/oso_dagster/assets/__init__.py`. +All assets defined in this module are automatically +loaded into Dagster from the main branch of the git repository. For an example of a custom Dagster asset, check out the [asset for oss-directory](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets/ossd.py), diff --git a/apps/docs/docs/contribute/connect-data/gcs.md b/apps/docs/docs/contribute/connect-data/gcs.md index 2b903e1a0..6e26bfd28 100644 --- a/apps/docs/docs/contribute/connect-data/gcs.md +++ b/apps/docs/docs/contribute/connect-data/gcs.md @@ -1,38 +1,92 @@ --- -title: 🏗️ Connect via Google Cloud Storage (GCS) +title: Connect via Google Cloud Storage (GCS) sidebar_position: 4 --- import NextSteps from "./dagster-config.mdx" -Depending on the data, we may accept data dumps -into our Google Cloud Storage (GCS). +We strongly prefer data partners that can provide +updated live datasets, over a static snapshot. +Datasets that use this method will require OSO sponsorship +for the storing the data, because we take on the costs +of converting it into a BigQuery dataset +and associated long-term storage costs. If you believe your data storage qualifies to be sponsored by OSO, please reach out to us on [Discord](https://www.opensource.observer/discord). -## Get write access +If you prefer to handle the data storage yourself, check out the +[Connect via BigQuery guide](./bigquery/index.md). -Coordinate with the OSO engineering team directly on +## Schedule periodic dumps to GCS + +First, you can coordinate with the OSO engineering team directly on [Discord](https://www.opensource.observer/discord) to give your Google service account write permissions to our GCS bucket. +With these access permissions, you should schedule a +cron job to regularly dump new time-partitioned data, +usually in daily or weekly jobs. + ## Defining a Dagster Asset -:::warning -Coming soon... This section is a work in progress -and will be likely refactored soon. -::: +Next, create a new asset file in +`warehouse/oso_dagster/assets/`. +This file should invoke the GCS asset factory. +For example, you can see this in action for +[Gitcoin passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets/gitcoin.py): + +```python +# warehouse/oso_dagster/assets/gitcoin.py +from ..factories import ( + interval_gcs_import_asset, + SourceMode, + Interval, + IntervalGCSAsset, +) + +gitcoin_passport_scores = interval_gcs_import_asset( + IntervalGCSAsset( + key_prefix="gitcoin", + name="passport_scores", + project_id="opensource-observer", + bucket_name="oso-dataset-transfer-bucket", + path_base="passport", + file_match=r"(?P\d\d\d\d-\d\d-\d\d)/scores.parquet", + destination_table="passport_scores", + raw_dataset_name="oso_raw_sources", + clean_dataset_name="gitcoin", + interval=Interval.Daily, + mode=SourceMode.Overwrite, + retention_days=10, + format="PARQUET", + ), +) +``` -To see an example of this in action, -you can look into our Dagster asset for -[Gitcoin passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py). +For the latest documentation on configuration parameters, +check out the comments in the +[GCS factory](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/factories/gcs.py). + +In order for our Dagster deployment to recognize this asset, +you need to import it in +`warehouse/oso_dagster/assets/__init__.py`. + +```python +from .dbt import * +from .gitcoin import * +... +``` For more details on defining Dagster assets, see the [Dagster tutorial](https://docs.dagster.io/tutorial). -## GCS import examples in OSO +### GCS examples in OSO + +In the +[OSO monorepo](https://github.com/opensource-observer/oso), +you will find a few examples of using the GCS asset factory: - [Superchain data](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py) - [Gitcoin Passport scores](https://github.com/opensource-observer/oso/blob/main/warehouse/oso_dagster/assets.py) diff --git a/boop.txt b/boop.txt deleted file mode 100644 index b1b3d95ff..000000000 --- a/boop.txt +++ /dev/null @@ -1,40 +0,0 @@ -01:07:33 Running with dbt=1.7.9 -01:07:33 Registered adapter: bigquery=1.7.6 -01:07:33 Found 112 models, 33 sources, 0 exposures, 0 metrics, 463 macros, 0 groups, 0 semantic models -01:07:33 -01:07:34 Concurrency: 32 threads (target='production') -01:07:34 -01:07:34 Compiled node 'playground__ossd_collections' is: - - -with __dbt__cte__playground__project_filter as ( - - -SELECT * FROM UNNEST([ - "gitcoin", - "opensource-observer", - "uniswap", - "velodrome", - "ethereum-attestation-service", - "zora", - "libp2p", - "rabbit-hole", - "safe-global", - "aave" -]) as project_slug -), filtered_collections as ( - select distinct - collections.collection_name as `name`, - collections.sync_time as `sync_time` - from `opensource-observer`.`oso`.`stg_ossd__current_collections` as collections - cross join UNNEST(collections.projects) as project_name - inner join `opensource-observer`.`oso`.`stg_ossd__current_projects` as projects - on projects.project_name = project_name - where project_name IN (select * from __dbt__cte__playground__project_filter) -) - -select collections.* -from `opensource-observer`.`oso`.`collections_ossd` as collections -inner join filtered_collections as filtered - on filtered.name = collections.name - and collections._cq_sync_time = filtered.sync_time diff --git a/warehouse/oso_dagster/factories/gcs.py b/warehouse/oso_dagster/factories/gcs.py index dfb11da84..63332f6b1 100644 --- a/warehouse/oso_dagster/factories/gcs.py +++ b/warehouse/oso_dagster/factories/gcs.py @@ -25,16 +25,18 @@ from .common import AssetFactoryResponse from ..utils.bq import ensure_dataset, DatasetOptions - +# An enum for specifying time intervals class Interval(Enum): Hourly = 0 Daily = 1 Weekly = 2 Monthly = 3 - +# Configures how we should handle incoming data class SourceMode(Enum): + # Add new time-partitioned data incrementally Incremental = 0 + # Overwrite the entire dataset on each import Overwrite = 1 @@ -42,21 +44,31 @@ class SourceMode(Enum): class BaseGCSAsset: name: str key_prefix: Optional[str | Sequence[str]] = "" + # GCP project ID (usually opensource-observer) project_id: str + # GCS bucket name bucket_name: str path_base: str + # Regex for incoming files file_match: str + # Table name nested under the dataset destination_table: str + # BigQuery temporary staging dataset for imports raw_dataset_name: str + # BigQuery destination dataset clean_dataset_name: str + # Format of incoming files (PARQUET preferred) format: str = "CSV" asset_kwargs: dict = field(default_factory=lambda: {}) @dataclass(kw_only=True) class IntervalGCSAsset(BaseGCSAsset): + # How often we should run this job interval: Interval + # Incremental or overwrite mode: SourceMode + # Retention time before deleting GCS files retention_days: int