diff --git a/HISTORY.rst b/HISTORY.rst index e1f5c31..199f22a 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -8,6 +8,10 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +unreleased +--------------------------- ++ ``-s`` pytest flag is now supported, streaming stdout/stderr as the tests run + version 2.1.0 --------------------------- + Python version 3.7 support is dropped because it is deprecated. Python diff --git a/src/pytest_workflow/content_tests.py b/src/pytest_workflow/content_tests.py index 2770188..51ba826 100644 --- a/src/pytest_workflow/content_tests.py +++ b/src/pytest_workflow/content_tests.py @@ -72,7 +72,7 @@ def check_content(strings: Iterable[str], class ContentTestCollector(pytest.Collector): def __init__(self, name: str, parent: pytest.Collector, - filepath: Path, + filepath: Optional[Path], content_test: ContentTest, workflow: Workflow, content_name: Optional[str] = None): @@ -105,6 +105,11 @@ def find_strings(self): When a file we test is not produced, we save the FileNotFoundError so we can give an accurate repr_failure.""" self.workflow.wait() + + if self.filepath is None: + self.file_not_found = True + return + strings_to_check = (self.content_test.contains + self.content_test.must_not_contain) patterns_to_check = (self.content_test.contains_regex + @@ -195,6 +200,10 @@ def __init__(self, parent: ContentTestCollector, string: str, name = f"{contain} '{string}'" super().__init__(name, parent=parent) self.parent: ContentTestCollector = parent # explicitly declare type + assert self.parent.filepath is not None, ( + f"Invalid test {content_name}, unknown file to validate. " + "This can happen if you specify stdout/stderr tests while " + "specifying a different capture method.") self.should_contain = should_contain self.string = string self.content_name = content_name diff --git a/src/pytest_workflow/plugin.py b/src/pytest_workflow/plugin.py index 142c715..0b10c4a 100644 --- a/src/pytest_workflow/plugin.py +++ b/src/pytest_workflow/plugin.py @@ -409,7 +409,8 @@ def queue_workflow(self): workflow = Workflow(command=self.workflow_test.command, cwd=tempdir, name=self.workflow_test.name, - desired_exit_code=self.workflow_test.exit_code) + desired_exit_code=self.workflow_test.exit_code, + capture=self.config.getoption("capture")) # Add the workflow to the workflow queue. self.config.workflow_queue.put(workflow) diff --git a/src/pytest_workflow/workflow.py b/src/pytest_workflow/workflow.py index 748b024..eec34f7 100644 --- a/src/pytest_workflow/workflow.py +++ b/src/pytest_workflow/workflow.py @@ -27,7 +27,11 @@ import threading import time from pathlib import Path -from typing import List, Optional +from typing import List, Literal, Optional + + +# pytest does not export this type +CaptureMethod = Literal["fd", "sys", "no", "tee-sys"] class Workflow(object): @@ -36,7 +40,8 @@ def __init__(self, command: str, cwd: Optional[Path] = None, name: Optional[str] = None, - desired_exit_code: int = 0): + desired_exit_code: int = 0, + capture: CaptureMethod = "fd"): """ Initiates a workflow object :param command: The string that represents the command to be run @@ -48,24 +53,35 @@ def __init__(self, if command == "": raise ValueError("command can not be an empty string") self.command = command + # Always ensure a name. command.split()[0] can't fail because we tested # for emptiness. self.name = name or command.split()[0] self.cwd = cwd or Path() + # For long running workflows it is best to save the stdout and stderr # to a file which can be checked with ``tail -f``. # stdout and stderr will be written to a tempfile if no CWD is given # to prevent clutter created when testing. - self.stdout_file = ( - Path(tempfile.NamedTemporaryFile(prefix=self.name, - suffix=".out").name) - if cwd is None - else self.cwd / Path("log.out")) - self.stderr_file = ( - Path(tempfile.NamedTemporaryFile(prefix=self.name, - suffix=".err").name) - if cwd is None - else self.cwd / Path("log.err")) + supported_capture_methods = ["no", "fd"] + if capture not in supported_capture_methods: + raise ValueError("only capture methods " + f"{supported_capture_methods} are supported, " + f"found {capture}") + self.capture = capture + self.stdout_file = None + self.stderr_file = None + if self.capture != "no": + self.stdout_file = ( + Path(tempfile.NamedTemporaryFile(prefix=self.name, + suffix=".out").name) + if cwd is None + else self.cwd / Path("log.out")) + self.stderr_file = ( + Path(tempfile.NamedTemporaryFile(prefix=self.name, + suffix=".err").name) + if cwd is None + else self.cwd / Path("log.err")) self._popen: Optional[subprocess.Popen] = None self._started = False self.errors: List[Exception] = [] @@ -79,9 +95,12 @@ def start(self): # is started from multiple threads. with self.start_lock: if not self._started: + stdout_h = None + stderr_h = None try: - stdout_h = self.stdout_file.open('wb') - stderr_h = self.stderr_file.open('wb') + if self.capture != "no": + stdout_h = self.stdout_file.open('wb') + stderr_h = self.stderr_file.open('wb') sub_process_args = shlex.split(self.command) self._popen = subprocess.Popen( sub_process_args, stdout=stdout_h, @@ -91,8 +110,10 @@ def start(self): self.errors.append(error) finally: self._started = True - stdout_h.close() - stderr_h.close() + if stdout_h is not None: + stdout_h.close() + if stderr_h is not None: + stderr_h.close() else: raise ValueError("Workflows can only be started once") @@ -148,12 +169,20 @@ def matching_exitcode(self) -> bool: @property def stdout(self) -> bytes: self.wait() + if self.stdout_file is None: + raise ValueError( + f"Stdout not available with capture={self.capture}" + ) with self.stdout_file.open('rb') as stdout: return stdout.read() @property def stderr(self) -> bytes: self.wait() + if self.stderr_file is None: + raise ValueError( + f"Stdout not available with capture={self.capture}" + ) with self.stderr_file.open('rb') as stderr: return stderr.read() diff --git a/tests/test_fail_messages.py b/tests/test_fail_messages.py index 4640bd0..0e1cb35 100644 --- a/tests/test_fail_messages.py +++ b/tests/test_fail_messages.py @@ -179,3 +179,19 @@ def test_messages_exitcode(test: str, message: str, pytester): # possible due to multiple levels of process launching. result = pytester.runpytest("-v", "--sb", "5") assert message in result.stdout.str() + + +def test_invalid_test_capture(pytester): + test_yml_contents = """\ + - name: tee test + command: bash -c "echo foo" + stdout: + contains: + - foo + """ + pytester.makefile(".yml", textwrap.dedent(test_yml_contents)) + result = pytester.runpytest("-v", "-s") + assert ( + "Invalid test 'tee test': stdout, unknown file to validate" + in result.stdout.str() + ) diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 4634427..3ef87a7 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -105,3 +105,17 @@ def test_workflow_matching_exit_code(): workflow2 = Workflow("grep", desired_exit_code=2) workflow2.run() assert workflow2.matching_exitcode() + + +def test_capture_unsupported(): + with pytest.raises(ValueError) as error: + Workflow("echo moo", capture="tee-sys") + error.match("only capture methods") + + +def test_capture_no(): + workflow = Workflow("echo moo", capture="no") + workflow.run() + with pytest.raises(ValueError) as error: + workflow.stdout + error.match("Stdout not available")