From aaf24575a867a373e938eaef516404137aec752d Mon Sep 17 00:00:00 2001 From: John Vandenberg Date: Thu, 15 Aug 2019 00:46:18 +0700 Subject: [PATCH] Convert stdio_mgr to be tuple StdioManager StdioManager is able to behave like a tuple, yielding stdin, stdout and stderr, or like an object that has attributes stdin, stdout and stderr. On Python 3.6+, StdioManager is a AbstractContextManager. stdio_mgr is an alias for StdioManager. Closes https://github.com/bskinn/stdio-mgr/issues/10 --- src/stdio_mgr/__init__.py | 1 + src/stdio_mgr/stdio_mgr.py | 112 +++++++++++++++++++++++++----------- tests/test_stdiomgr_base.py | 89 +++++++++++++++++++++++++++- 3 files changed, 167 insertions(+), 35 deletions(-) diff --git a/src/stdio_mgr/__init__.py b/src/stdio_mgr/__init__.py index fb12a4a..a7879dc 100644 --- a/src/stdio_mgr/__init__.py +++ b/src/stdio_mgr/__init__.py @@ -27,4 +27,5 @@ """ from stdio_mgr.stdio_mgr import stdio_mgr # noqa: F401 +from stdio_mgr.stdio_mgr import StdioManager # noqa: F401 from stdio_mgr.version import __version__ # noqa: F401 diff --git a/src/stdio_mgr/stdio_mgr.py b/src/stdio_mgr/stdio_mgr.py index aae301f..e4b09dd 100644 --- a/src/stdio_mgr/stdio_mgr.py +++ b/src/stdio_mgr/stdio_mgr.py @@ -27,9 +27,16 @@ """ import sys -from contextlib import contextmanager, ExitStack, suppress +from contextlib import ExitStack, suppress from io import BufferedRandom, BufferedReader, BytesIO, TextIOBase, TextIOWrapper +# AbstractContextManager was introduced in Python 3.6 +# and may be used with typing.ContextManager. +try: + from contextlib import AbstractContextManager +except ImportError: # pragma: no cover + AbstractContextManager = object + import attr @@ -240,9 +247,25 @@ class SafeCloseTeeStdin(_SafeCloseIOBase, TeeStdin): """ -@contextmanager -def stdio_mgr(in_str="", close=True): - r"""Subsitute temporary text buffers for `stdio` in a managed context. +class _MultiCloseContextManager(tuple, AbstractContextManager): + """Manage multiple closable members of a tuple.""" + + def __enter__(self): + """Enter context of all members.""" + with ExitStack() as stack: + all(map(stack.enter_context, self)) + + self._close_files = stack.pop_all().close + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit context, closing all members.""" + self._close_files() + + +class StdioManager(_MultiCloseContextManager): + r"""Substitute temporary text buffers for `stdio` in a managed context. Context manager. @@ -278,34 +301,57 @@ def stdio_mgr(in_str="", close=True): initially empty. """ - if close: - out_cls = SafeCloseRandomTextIO - in_cls = SafeCloseTeeStdin - else: - out_cls = RandomTextIO - in_cls = TeeStdin - - old_stdin = sys.stdin - old_stdout = sys.stdout - old_stderr = sys.stderr - - with ExitStack() as stack: - new_stdout = stack.enter_context(out_cls()) - new_stderr = stack.enter_context(out_cls()) - new_stdin = stack.enter_context(in_cls(new_stdout, in_str)) - - close_files = stack.pop_all().close - - sys.stdin = new_stdin - sys.stdout = new_stdout - sys.stderr = new_stderr - - try: - yield new_stdin, new_stdout, new_stderr - finally: - sys.stdin = old_stdin - sys.stdout = old_stdout - sys.stderr = old_stderr + def __new__(cls, in_str="", close=True): + """Instantiate new context manager that emulates namedtuple.""" if close: - close_files() + out_cls = SafeCloseRandomTextIO + in_cls = SafeCloseTeeStdin + else: + out_cls = RandomTextIO + in_cls = TeeStdin + + stdout = out_cls() + stderr = out_cls() + stdin = in_cls(stdout, in_str) + + self = super(StdioManager, cls).__new__(cls, [stdin, stdout, stderr]) + + self._close = close + + return self + + @property + def stdin(self): + """Return capturing stdin stream.""" + return self[0] + + @property + def stdout(self): + """Return capturing stdout stream.""" + return self[1] + + @property + def stderr(self): + """Return capturing stderr stream.""" + return self[2] + + def __enter__(self): + """Enter context, replacing sys stdio objects with capturing streams.""" + self._prior_streams = (sys.stdin, sys.stdout, sys.stderr) + + super().__enter__() + + (sys.stdin, sys.stdout, sys.stderr) = self + + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit context, closing files and restoring state of sys module.""" + (sys.stdin, sys.stdout, sys.stderr) = self._prior_streams + + if self._close: + super().__exit__(exc_type, exc_value, traceback) + + +stdio_mgr = StdioManager diff --git a/tests/test_stdiomgr_base.py b/tests/test_stdiomgr_base.py index ae73686..ec7d25d 100644 --- a/tests/test_stdiomgr_base.py +++ b/tests/test_stdiomgr_base.py @@ -26,13 +26,65 @@ """ +import collections.abc import io import sys import warnings +# AbstractContextManager was introduced in Python 3.6 +try: + from contextlib import AbstractContextManager +except ImportError: + AbstractContextManager = object + import pytest -from stdio_mgr import stdio_mgr +from stdio_mgr import stdio_mgr, StdioManager + + +def test_context_manager_instance(): + """Confirm StdioManager instance is a tuple and registered context manager.""" + cm = StdioManager() + + assert isinstance(cm, tuple) + + value_list = list(cm) + + assert isinstance(cm, collections.abc.Sequence) + assert isinstance(cm, AbstractContextManager) + assert not isinstance(cm, collections.abc.MutableSequence) + assert all(isinstance(item, io.TextIOWrapper) for item in cm) + + # Check copies are equal + assert list(cm) == value_list + + +def test_context_manager_instance_with(): + """Confirm StdioManager works in with.""" + with StdioManager() as cm: + assert isinstance(cm, tuple) + + inner_value_list = list(cm) + + assert isinstance(cm, collections.abc.Sequence) + assert isinstance(cm, AbstractContextManager) + assert not isinstance(cm, collections.abc.MutableSequence) + assert all(isinstance(item, io.TextIOWrapper) for item in cm) + # Check copies are equal + assert list(cm) == inner_value_list + + # Still equal + assert list(cm) == inner_value_list + + +def test_instance_capture_stdout(convert_newlines): + """Confirm object stdout capture.""" + with stdio_mgr() as cm: + s = "test str" + print(s) + + # 'print' automatically adds a newline + assert convert_newlines(s + "\n") == cm.stdout.getvalue() def test_capture_stdout(convert_newlines): @@ -72,6 +124,17 @@ def test_capture_stderr_print(convert_newlines): assert convert_newlines(w + "\n") in e.getvalue() +def test_capture_instance_stderr_print(convert_newlines): + """Confirm object capture of stderr print.""" + with stdio_mgr() as cm: + w = "This is a warning" + + print(w, file=sys.stderr) + + # Warning text comes at the end of a line; newline gets added + assert convert_newlines(w + "\n") in cm.stderr.getvalue() + + def test_capture_stderr_warn(convert_newlines, skip_warnings): """Confirm stderr capture of warnings.warn.""" if skip_warnings: @@ -103,6 +166,23 @@ def test_default_stdin(convert_newlines): assert convert_newlines(in_str[:-1]) == out_str +def test_capture_instance_stdin(convert_newlines): + """Confirm object stdin.""" + in_str = "This is a test string.\n" + + with stdio_mgr(in_str) as cm: + assert in_str == cm.stdin.getvalue() + + out_str = input() + + # TeeStdin tees the stream contents, *including* the newline, + # to the managed stdout + assert convert_newlines(in_str) == cm.stdout.getvalue() + + # 'input' strips the trailing newline before returning + assert convert_newlines(in_str[:-1]) == out_str + + def test_managed_stdin(convert_newlines): """Confirm stdin populate within context.""" str1 = "This is a test string." @@ -145,11 +225,16 @@ def test_repeated_use(convert_newlines): def test_noop(): - """Confirm state is restored after context.""" + """Confirm sys module state is restored after use.""" real_sys_stdio = (sys.stdin, sys.stdout, sys.stderr) + stdio_mgr() assert (sys.stdin, sys.stdout, sys.stderr) == real_sys_stdio + with stdio_mgr(): + pass + assert (sys.stdin, sys.stdout, sys.stderr) == real_sys_stdio + def test_exception(): """Confirm state is restored after an exception during context."""