Skip to content

Commit

Permalink
fix(spy): prefix internal properties with _decoy (#150)
Browse files Browse the repository at this point in the history
Closes #144 by reducing the risk of attribute collision
  • Loading branch information
mcous authored Dec 23, 2022
1 parent 6481543 commit 6a6868a
Showing 1 changed file with 45 additions and 37 deletions.
82 changes: 45 additions & 37 deletions decoy/spy.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ def __init__(
spy_creator: "SpyCreator",
) -> None:
"""Initialize a BaseSpy from a call handler and an optional spec object."""
super().__setattr__("_core", core)
super().__setattr__("_call_handler", call_handler)
super().__setattr__("_spy_creator", spy_creator)
super().__setattr__("_spy_children", {})
super().__setattr__("_spy_property_values", {})
super().__setattr__("__signature__", self._core.signature)
super().__setattr__("_decoy_spy_core", core)
super().__setattr__("_decoy_spy_call_handler", call_handler)
super().__setattr__("_decoy_spy_creator", spy_creator)
super().__setattr__("_decoy_spy_children", {})
super().__setattr__("_decoy_spy_property_values", {})
super().__setattr__("__signature__", self._decoy_spy_core.signature)

@property # type: ignore[misc]
def __class__(self) -> Any:
"""Ensure Spy can pass `instanceof` checks."""
return self._core.class_type or type(self)
return self._decoy_spy_core.class_type or type(self)

def __enter__(self) -> Any:
"""Allow a spy to be used as a context manager."""
enter_spy = self._get_or_create_child_spy("__enter__")
enter_spy = self._decoy_spy_get_or_create_child_spy("__enter__")
return enter_spy()

def __exit__(
Expand All @@ -56,12 +56,14 @@ def __exit__(
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Allow a spy to be used as a context manager."""
exit_spy = self._get_or_create_child_spy("__exit__")
exit_spy = self._decoy_spy_get_or_create_child_spy("__exit__")
return cast(Optional[bool], exit_spy(exc_type, exc_value, traceback))

async def __aenter__(self) -> Any:
"""Allow a spy to be used as an async context manager."""
enter_spy = self._get_or_create_child_spy("__aenter__", child_is_async=True)
enter_spy = self._decoy_spy_get_or_create_child_spy(
"__aenter__", child_is_async=True
)
return await enter_spy()

async def __aexit__(
Expand All @@ -71,49 +73,53 @@ async def __aexit__(
traceback: Optional[TracebackType],
) -> Optional[bool]:
"""Allow a spy to be used as a context manager."""
exit_spy = self._get_or_create_child_spy("__aexit__", child_is_async=True)
exit_spy = self._decoy_spy_get_or_create_child_spy(
"__aexit__", child_is_async=True
)
return cast(Optional[bool], await exit_spy(exc_type, exc_value, traceback))

def __repr__(self) -> str:
"""Get a helpful string representation of the spy."""
return f"<Decoy mock `{self._core.full_name}`>"
return f"<Decoy mock `{self._decoy_spy_core.full_name}`>"

def __getattr__(self, name: str) -> Any:
"""Get a property of the spy, always returning a child spy."""
# do not attempt to mock magic methods
if name.startswith("__") and name.endswith("__"):
return super().__getattribute__(name)

return self._get_or_create_child_spy(name)
return self._decoy_spy_get_or_create_child_spy(name)

def __setattr__(self, name: str, value: Any) -> None:
"""Set a property on the spy, recording the call."""
event = SpyEvent(
spy=self._core.info,
spy=self._decoy_spy_core.info,
payload=SpyPropAccess(
prop_name=name,
access_type=PropAccessType.SET,
value=value,
),
)
self._call_handler.handle(event)
self._spy_property_values[name] = value
self._decoy_spy_call_handler.handle(event)
self._decoy_spy_property_values[name] = value

def __delattr__(self, name: str) -> None:
"""Delete a property on the spy, recording the call."""
event = SpyEvent(
spy=self._core.info,
spy=self._decoy_spy_core.info,
payload=SpyPropAccess(prop_name=name, access_type=PropAccessType.DELETE),
)
self._call_handler.handle(event)
self._spy_property_values.pop(name, None)
self._decoy_spy_call_handler.handle(event)
self._decoy_spy_property_values.pop(name, None)

def _get_or_create_child_spy(self, name: str, child_is_async: bool = False) -> Any:
def _decoy_spy_get_or_create_child_spy(
self, name: str, child_is_async: bool = False
) -> Any:
"""Lazily construct a child spy, basing it on type hints if available."""
# check for any stubbed behaviors for property getter
get_result = self._call_handler.handle(
get_result = self._decoy_spy_call_handler.handle(
SpyEvent(
spy=self._core.info,
spy=self._decoy_spy_core.info,
payload=SpyPropAccess(
prop_name=name,
access_type=PropAccessType.GET,
Expand All @@ -124,30 +130,32 @@ def _get_or_create_child_spy(self, name: str, child_is_async: bool = False) -> A
if get_result:
return get_result.value

if name in self._spy_property_values:
return self._spy_property_values[name]
if name in self._decoy_spy_property_values:
return self._decoy_spy_property_values[name]

# return previously constructed (and cached) child spies
if name in self._spy_children:
return self._spy_children[name]
if name in self._decoy_spy_children:
return self._decoy_spy_children[name]

child_core = self._core.create_child_core(name=name, is_async=child_is_async)
child_spy = self._spy_creator.create(core=child_core)
self._spy_children[name] = child_spy
child_core = self._decoy_spy_core.create_child_core(
name=name, is_async=child_is_async
)
child_spy = self._decoy_spy_creator.create(core=child_core)
self._decoy_spy_children[name] = child_spy

return child_spy

def _call(self, *args: Any, **kwargs: Any) -> Any:
bound_args, bound_kwargs = self._core.bind_args(*args, **kwargs)
def _decoy_spy_call(self, *args: Any, **kwargs: Any) -> Any:
bound_args, bound_kwargs = self._decoy_spy_core.bind_args(*args, **kwargs)
call = SpyEvent(
spy=self._core.info,
spy=self._decoy_spy_core.info,
payload=SpyCall(
args=bound_args,
kwargs=bound_kwargs,
),
)

result = self._call_handler.handle(call)
result = self._decoy_spy_call_handler.handle(call)
return result.value if result else None


Expand All @@ -156,7 +164,7 @@ class AsyncSpy(BaseSpy):

async def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call to the spy asynchronously."""
result = self._call(*args, **kwargs)
result = self._decoy_spy_call(*args, **kwargs)
return (await result) if inspect.iscoroutine(result) else result


Expand All @@ -165,7 +173,7 @@ class Spy(BaseSpy):

def __call__(self, *args: Any, **kwargs: Any) -> Any:
"""Handle a call to the spy."""
return self._call(*args, **kwargs)
return self._decoy_spy_call(*args, **kwargs)


AnySpy = Union[AsyncSpy, Spy]
Expand All @@ -175,7 +183,7 @@ class SpyCreator:
"""Spy factory."""

def __init__(self, call_handler: CallHandler) -> None:
self._call_handler = call_handler
self._decoy_spy_call_handler = call_handler

@overload
def create(self, *, core: SpyCore) -> AnySpy:
Expand Down Expand Up @@ -208,5 +216,5 @@ def create(
return spy_cls(
core=core,
spy_creator=self,
call_handler=self._call_handler,
call_handler=self._decoy_spy_call_handler,
)

0 comments on commit 6a6868a

Please sign in to comment.