Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14/n][dagster-airbyte] Implement materialization method for AirbyteCloudWorkspace #26559

Open
wants to merge 2 commits into
base: maxime/rework-airbyte-cloud-13
Choose a base branch
from

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Dec 18, 2024

Summary & Motivation

This PR implements AirbyteCloudWorkspace.sync_and_poll, the materialization method for Airbyte Cloud assets. This method:

  • calls AirbyteCloudClient.sync_and_poll
  • takes the AirbyteOutput returned by AirbyteCloudClient.sync_and_poll and generates the asset materializations
  • yields MaterializeResult for each expected asset and AssetMaterialization for each unexpected asset
    • a connection table that was not in the connection at definitions loading time can be in the AirbyteOutput. Eg. the table was added after definitions loading time and before sync.
  • logs a warning for each unmaterialized table
    • a connection table can be created at definitions loading time, but can be missing in the AirbyteOutput. Eg. the table was deleted after definitions loading time and before sync.

Can be leveraged like:

from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
    workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
    client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
    client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
    connection_id="airbyte_connection_id",
    name="airbyte_connection_id",
    group_name="airbyte_connection_id",
    workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
    yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
    assets=[airbyte_connection_assets],
    resources={"airbyte": airbyte_workspace},
)

How I Tested These Changes

Additional tests with BK

Changelog

[dagster-airbyte] Airbyte Cloud assets can now be materialized using the AirbyteCloudWorkspace.sync_and_poll(…) method in the definition of a @airbyte_assets decorator.

@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from 71c99c0 to 3641abe Compare December 18, 2024 01:07
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 7d77d65 to c717f03 Compare December 18, 2024 01:07
@maximearmstrong maximearmstrong changed the title [14/n][dagster-airbyte] Implement AirbyteCloudWorkspace.sync_and_poll [14/n][dagster-airbyte] Implement materialization method for AirbyteCloudWorkspace Dec 18, 2024
@maximearmstrong maximearmstrong marked this pull request as ready for review December 18, 2024 01:22
@maximearmstrong maximearmstrong self-assigned this Dec 18, 2024
specs=[
spec
spec.merge_attributes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the translator to the metadata to reuse it in the materialization process.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant