Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][dagster-airbyte] RFC: Prototype airbyte_assets asset decorator #22847

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Jul 3, 2024

Summary & Motivation

Current state

Currently, users can create their Airbyte assets in three different ways in Dagster.

  • load_assets_from_airbyte_instance
    • Gets all connections from an Airbyte assets and creates Dagster assets for each of them
    • Uses CacheableAssetsDefinition
  • load_assets_from_airbyte_project
    • Gets all connections from an Octavia project directory and local YAML files. Creates Dagster assets for each of the connections.
    • Uses CacheableAssetsDefinition
  • build_airbyte_assets
    • Creates a multi-asset for a single Airbyte connection.
    • Does not use CacheableAssetsDefinition, but multi-asset

Looks like this, assuming no overlap in Airbyte connection between approaches:

from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource

my_airbyte_resource = AirbyteResource(
    host=EnvVar("AIRBYTE_HOST"),
    port=EnvVar("AIRBYTE_PORT"),
    # If using basic auth
    username=EnvVar("AIRBYTE_USERNAME"),
    password=EnvVar("AIRBYTE_PASSWORD"),
)

my_connection_assets = with_resources(
    build_airbyte_assets(
        connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
        destination_tables=["releases", "tags", "teams"],
    ),
    {"airbyte": airbyte_instance},
)

my_project_assets  = with_resources(
    [load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")],
    {"airbyte": airbyte_instance},
)

my_instance_assets = load_assets_from_airbyte_instance(
    airbyte = my_airbyte_resource,
)

defs = Definitions(
    assets=[my_connection_assets, my_project_assets, my_instance_assets],
    resources={"airbyte": my_airbyte_resource},
)

Propositions

  1. Refactor build_airbyte_assets in an asset decorator, airbyte_assets, to allow more flexibility with the resource.
from dagster import job, EnvVar
from dagster_airbyte import AirbyteResource, airbyte_assets

my_airbyte_resource = AirbyteResource(
    host=EnvVar("AIRBYTE_HOST"),
    port=EnvVar("AIRBYTE_PORT"),
    # If using basic auth
    username=EnvVar("AIRBYTE_USERNAME"),
    password=EnvVar("AIRBYTE_PASSWORD"),
)

@airbyte_assets(
    connection_id="87b7fe85-a22c-420e-8d74-b30e7ede77df",
    destination_tables=["releases", "tags", "teams"],
)
def my_airbyte_assets(context, airbyte: AirbyteResource):
    # Do something with my resource
    yield from airbyte.materialize_from_sync(context=context)

defs = Definitions(
    assets=[my_airbyte_assets],
    resources={"airbyte": my_airbyte_resource},
)

In this approach we would keep load_assets_from_airbyte_instance and load_assets_from_airbyte_project as-is.

  1. Use one asset decorator to manage all the connections.
  • Select the connections/assets to materialize
  • Prepare the connections in advance, at run time during development and at build time during deployement. Create the assets based on these connections.
  • Remove the Cacheable assets
from dagster import job, EnvVar
from dagster_airbyte import (
    AirbyteInstanceProject, 
    AirbyteLocalProject, 
    AirbyteResource, 
    airbyte_assets
)

my_airbyte_resource = AirbyteResource(
    host=EnvVar("AIRBYTE_HOST"),
    port=EnvVar("AIRBYTE_PORT"),
    # If using basic auth
    username=EnvVar("AIRBYTE_USERNAME"),
    password=EnvVar("AIRBYTE_PASSWORD"),
)

airbyte_instance_project = AirbyteInstanceProject(my_airbyte_resource)
airbyte_instance_project.prepare_if_dev()

airbyte_local_project = AirbyteLocalProject("/path/to/local/project", my_airbyte_resource)
airbyte_local_project.prepare_if_dev()

@airbyte_assets(
    connections=[
        airbyte_instance_project.connections,
        airbyte_local_project.connections,
        # other connections
    ],
)
def my_airbyte_assets(context, airbyte: AirbyteResource):
    # Do something with my resource
    yield from airbyte.materialize_from_sync(context=context)

defs = Definitions(
    assets=[my_airbyte_assets],
    resources={"airbyte": my_airbyte_resource},
)

In this approach we would keep load_assets_from_airbyte_instance and load_assets_from_airbyte_project as-is.

Todo

  • [] Implement the translator to replace the function passed to load_assets_from_airbyte_instance and load_assets_from_airbyte_project
  • [] Implement a AirbyteConfigurationProject and AirbyteInstanceProject to reflect the project and connections

How I Tested These Changes

TODO

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

Join @maximearmstrong and the rest of your teammates on Graphite Graphite

@maximearmstrong maximearmstrong changed the title [WIP][dagster-airbyte] RFC: Prototype dagster_airbyte asset decorator [WIP][dagster-airbyte] RFC: Prototype airbyte_assets asset decorator Jul 3, 2024
@maximearmstrong maximearmstrong force-pushed the maxime/ds-323/prototype-dagster-airbyte branch from 7be710d to a20e8ac Compare July 9, 2024 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant