diff --git a/src/sghi/etl/core.py b/src/sghi/etl/core.py index 6b76581..4ed9372 100644 --- a/src/sghi/etl/core.py +++ b/src/sghi/etl/core.py @@ -24,6 +24,16 @@ """Raw Data Type.""" +# ============================================================================= +# HELPERS +# ============================================================================= + + +def _noop() -> None: + """Do nothing.""" + ... + + # ============================================================================= # BASE INTERFACES # ============================================================================= @@ -272,3 +282,32 @@ def sink_factory(self) -> Callable[[], Sink[_PDT]]: ``Sink`` instance associated with this workflow. """ ... + + @property + def prolog(self) -> Callable[[], None]: + """A callable to be executed at the beginning of the workflow. + + If the execution of this callable fails, i.e. raises an exception, then + the main workflow is never executed, only the callable returned by the + :attr:`epilog` property is. + This can be used to validate the loaded configuration, setting up + certain resources before the workflow execution starts, etc. + The default implementation of this property returns a callable that + does nothing. + + .. versionadded:: 1.2.0 + """ + return _noop + + @property + def epilog(self) -> Callable[[], None]: + """A callable to be executed at the end of the workflow. + + This is always executed regardless of whether the :meth:`prolog` or + workflow completed successfully or not. + The default implementation of this property returns a callable that + does nothing. + + .. versionadded:: 1.2.0 + """ + return _noop diff --git a/test/sghi/etl/core_tests.py b/test/sghi/etl/core_tests.py index cafb038..65f139f 100644 --- a/test/sghi/etl/core_tests.py +++ b/test/sghi/etl/core_tests.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Callable, Iterable from dataclasses import dataclass, field from unittest import TestCase @@ -10,7 +10,8 @@ from typing_extensions import override from sghi.disposable import not_disposed -from sghi.etl.core import Processor, Sink, Source +from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition +from sghi.utils import type_fqn # ============================================================================= # TESTS HELPERS @@ -19,7 +20,7 @@ @dataclass(slots=True) class IntsSupplier(Source[Iterable[int]]): - """A simple :class:`Source` that supplies integers.""" + """A :class:`Source` that supplies integers.""" max_ints: int = field(default=10) _is_disposed: bool = field(default=False, init=False) @@ -64,14 +65,13 @@ def dispose(self) -> None: class CollectToList(Sink[Iterable[str]]): """A :class:`Sink` that collects all the values it receives in a list.""" - collection_target: list[str] = field() + collection_target: list[str] = field(default_factory=list) _is_disposed: bool = field(default=False, init=False) @not_disposed @override def drain(self, processed_data: Iterable[str]) -> None: - for value in processed_data: - self.collection_target.append(value) + self.collection_target.extend(processed_data) @property @override @@ -83,6 +83,43 @@ def dispose(self) -> None: self._is_disposed = True +@dataclass(frozen=True, slots=True) +class TestWorkflowDefinition(WorkflowDefinition[Iterable[int], Iterable[str]]): + """A simple :class:`WorkflowDefinition` implementation.""" + + @property + @override + def description(self) -> str | None: + return None + + @property + @override + def id(self) -> str: + return "test" + + @property + @override + def name(self) -> str: + return "Test Workflow" + + @property + @override + def processor_factory( + self, + ) -> Callable[[], Processor[Iterable[int], Iterable[str]]]: + return IntsToStrings + + @property + @override + def sink_factory(self) -> Callable[[], Sink[Iterable[str]]]: + return CollectToList + + @property + @override + def source_factory(self) -> Callable[[], Source[Iterable[int]]]: + return IntsSupplier + + # ============================================================================= # TESTS # ============================================================================= @@ -91,7 +128,7 @@ def dispose(self) -> None: class TestSource(TestCase): """Tests for the :class:`sghi.etl.core.Source` interface. - Tests for the default method implementations on the `Source` interface. + Tests for the default method implementations of the ``Source`` interface. """ def test_invoking_source_as_a_callable_returns_expected_value( @@ -111,13 +148,15 @@ def test_invoking_source_as_a_callable_returns_expected_value( IntsSupplier(max_ints=max_ints) as instance1, IntsSupplier(max_ints=max_ints) as instance2, ): + # noinspection PyArgumentList assert list(instance1.draw()) == list(instance2()) == [0, 1, 2, 3] class TestProcessor(TestCase): """Tests for the :class:`sghi.etl.core.Processor` interface. - Tests for the default method implementations on the `Processor` interface. + Tests for the default method implementations of the ``Processor`` + interface. """ @override @@ -191,7 +230,7 @@ def test_invoking_the_process_method_raises_a_deprecation_waring( class TestSink(TestCase): """Tests for the :class:`sghi.etl.core.Processor` interface. - Tests for the default method implementations on the `Sink` interface. + Tests for the default method implementations of the ``Sink`` interface. """ @override @@ -231,3 +270,57 @@ def test_invoking_sink_as_a_callable_returns_expected_value(self) -> None: instance2(processed_data) assert collect1 == collect2 == ["0", "1", "2", "3", "4"] + + +class TestWorkflow(TestCase): + """Tests for the :class:`sghi.etl.core.WorkflowDefinition` interface. + + Tests for the default method implementations of the ``WorkflowDefinition`` + interface. + """ + + @override + def setUp(self) -> None: + super().setUp() + self._instance: WorkflowDefinition[Iterable[int], Iterable[str]] + self._instance = TestWorkflowDefinition() + + def test_epilog_return_value(self) -> None: + """The default implementation of + :meth:`~sghi.etl.core.WorkflowDefinition.epilog` should return a + callable that does nothing. + """ # noqa: D205 + epilog: Callable[[], None] = self._instance.epilog + assert callable(epilog) + + try: + epilog() + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + f"The following unexpected error: '{exp!r}', was raised when " + "invoking the callable returned by the default implementation " + f"of the '{type_fqn(WorkflowDefinition)}.epilog' property. No " + "errors should be raised by the callable returned by the " + "default implementation of the said property." + ) + pytest.fail(reason=_fail_reason) + + def test_prolog_return_value(self) -> None: + """The default implementation of + :meth:`~sghi.etl.core.WorkflowDefinition.prolog` should return a + callable that does nothing. + """ # noqa: D205 + prolog: Callable[[], None] = self._instance.prolog + assert callable(prolog) + + try: + prolog() + except Exception as exp: # noqa: BLE001 + _fail_reason: str = ( + f"The following unexpected error: '{exp!r}', was raised when " + "invoking the callable returned by the default implementation " + f"of the '{type_fqn(WorkflowDefinition)}.prolog' property. No " + "errors should be raised by the callable returned by the " + "default implementation of the said property." + ) + pytest.fail(reason=_fail_reason)