diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 4be96fb92d93a..31ef145987ed5 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1507,9 +1507,9 @@ def _output_to_source_asset(self, output_name: str) -> SourceAsset: return SourceAsset( key=key, - metadata=output_def.metadata, + metadata=spec.metadata, io_manager_key=output_def.io_manager_key, - description=output_def.description, + description=spec.description, resource_defs=self.resource_defs, partitions_def=self.partitions_def, group_name=spec.group_name, diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_input_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_input_manager.py index bffabed3ca42f..1e8b4f4e8a418 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_input_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_input_manager.py @@ -6,6 +6,7 @@ AssetKey, DagsterInstance, DagsterInvalidDefinitionError, + DataVersion, In, InputManager, IOManager, @@ -16,6 +17,7 @@ io_manager, job, materialize, + observable_source_asset, op, resource, ) @@ -367,6 +369,32 @@ def handle_output(self, context, obj): ... assert output._get_output_for_handle("downstream", "result") == 3 # noqa: SLF001 +def test_input_manager_with_observable_source_asset() -> None: + fancy_metadata = {"foo": "bar", "baz": 1.23} + + @observable_source_asset(metadata=fancy_metadata) + def upstream(): + return DataVersion("1") + + @asset(ins={"upstream": AssetIn(input_manager_key="special_io_manager")}) + def downstream(upstream) -> int: + return upstream + 1 + + class MyIOManager(IOManager): + def load_input(self, context) -> int: + assert context.upstream_output is not None + assert context.upstream_output.asset_key == AssetKey(["upstream"]) + # the process of converting assets to source assets leaves an extra metadata entry + # of dagster/io_manager_key in the dictionary, so we can't use simple equality here + for k, v in fancy_metadata.items(): + assert context.upstream_output.definition_metadata[k] == v + return 2 + + def handle_output(self, context, obj) -> None: ... + + materialize(assets=[upstream, downstream], resources={"special_io_manager": MyIOManager()}) + + def test_input_manager_with_assets_no_default_io_manager(): """Tests loading an upstream asset with an input manager when the downstream asset also uses a custom io manager. Fixes a bug where dagster expected the io_manager key to be provided.