# This file is part of Pynguin.
#
# SPDX-FileCopyrightText: 2019–2026 Pynguin Contributors
#
# SPDX-License-Identifier: MIT
#
"""Provides utilities to trace the usage of objects."""
# Parts of the following code were taken from the awesome
# https://github.com/GrahamDumpleton/wrapt module and modified for our purposes.
# The wrapt library is under BSD 2-Clause "Simplified" License:
# Copyright (c) 2013-2022, Graham Dumpleton
# All rights reserved.
# SPDX-FileCopyrightText: 2013-2022 Graham Dumpleton
# SPDX-License-Identifier: BSD-2-Clause
from __future__ import annotations
import builtins
import contextlib
import dataclasses
import logging
import operator
from collections import defaultdict
from asciitree import BoxStyle, LeftAligned
from asciitree.drawing import BOX_LIGHT
from pynguin.utils.orderedset import OrderedSet, OrderedTypeSet
LOGGER = logging.getLogger(__name__)
# Max depth for proxies. Afterwards we don't wrap values anymore.
_MAX_PROXY_NESTING = 5
VALUE_TRACED_TYPES = {str}
[docs]
@dataclasses.dataclass
class UsageTraceNode:
"""The knowledge gathered by a proxy."""
name: str
# The depth of the proxy within the proxied object tree.
# Zero indicates that it is the root.
depth: int = 0
# Attributes that have been accessed on this proxy.
children: dict[str, UsageTraceNode] = dataclasses.field(init=False)
# The type against which this proxy was checked.
type_checks: OrderedTypeSet = dataclasses.field(default_factory=OrderedTypeSet)
# Maps argument positions to their types.
arg_types: dict[int, OrderedSet[type]] = dataclasses.field(
default_factory=lambda: defaultdict(OrderedSet)
)
# Maps argument positions to used values.
arg_values: dict[int, OrderedSet[object]] = dataclasses.field(
default_factory=lambda: defaultdict(OrderedSet)
)
def __post_init__(self):
"""Initialize the attribute with a specific dictionary."""
self.children = DepthDefaultDict(self.depth)
[docs]
def find_path(self, path: tuple[str, ...]) -> UsageTraceNode | None:
"""Check if this usage trace tree has the given path.
Args:
path: The path to check
Returns:
The usage trace node at the end of the path, if it exists, otherwise None.
"""
assert len(path) > 0, "Expected non-empty path."
current = self
for element in path:
if element in current.children:
current = current.children[element]
else:
return None
return current
[docs]
def pretty(self) -> str:
"""Create a pretty representation of this object.
Returns:
A nicely formatted string
"""
tree = LeftAligned(
draw=BoxStyle(gfx=BOX_LIGHT, label_space=0, label_format="[{}]", indent=0)
)
return tree({self._format_str(): self._format_children()})
def __len__(self) -> int:
"""Yield the length of a usage-trace node.
The length is defined by the length of its children, argument types, and type
checks.
Returns:
The length of a usage-trace node
"""
return len(self.children) + len(self.arg_types) + len(self.type_checks)
def _format_str(self):
output = f"'{self.name}'"
if len(self.type_checks) > 0:
output += (
", type_checks: {" + ", ".join([check.__name__ for check in self.type_checks]) + "}"
)
if len(self.arg_types) > 0:
output += (
", arg_types: {"
+ ", ".join([
str(idx) + ": {" + ", ".join([tp.__name__ for tp in types]) + "}"
for idx, types in self.arg_types.items()
])
+ "}"
)
if len(self.arg_values) > 0:
output += (
", arg_values: {"
+ ", ".join([
str(idx) + ": {" + ", ".join([repr(val) for val in values]) + "}"
for idx, values in self.arg_values.items()
])
+ "}"
)
return output
def _format_children(self):
return {
child._format_str(): child._format_children() # noqa: SLF001
for child in self.children.values()
}
[docs]
@staticmethod
def from_proxy(obj: ObjectProxy) -> UsageTraceNode:
"""Extract knowledge from the given proxy.
This is a convenience method, because the knowledge attribute is not visible
on a proxy.
Args:
obj: the proxy from which we should extract knowledge
Returns:
The extracted knowledge.
"""
return obj._self_usage_trace_node # noqa: SLF001
[docs]
def merge(self, other: UsageTraceNode) -> None:
"""Merge the knowledge from the other proxy into this one.
Args:
other: The knowledge that should be merged into this one.
"""
assert self.name == other.name
assert self.depth == other.depth
self.arg_types.update(other.arg_types)
self.type_checks.update(other.type_checks)
self.arg_values.update(other.arg_values)
for attr, knowledge in other.children.items():
self.children[attr].merge(knowledge)
[docs]
class DepthDefaultDict(dict[str, UsageTraceNode]): # noqa: FURB189
"""A dictionary creating a UsageTraceNode automatically for each key.
The implementation creates a UsageTraceNode for each requested and non-existing key.
"""
def __init__(self, depth: int) -> None:
"""Initializes the dictionary.
Args:
depth: The depth of the trace
"""
super().__init__()
self._depth = depth
def __missing__(self, key: str) -> UsageTraceNode:
"""Creates the knowledge for a missing key.
Args:
key: The name of the key
Returns:
The instance of this dictionary
"""
res = self[key] = UsageTraceNode(key, depth=self._depth + 1)
return res
[docs]
def proxify(*, log_args=False, no_wrap_return=False):
"""Decorator method to trace the usage of a method on a proxy.
1. Unwraps the proxy.
2. Stores the access to the method
3. Stores the argument types if requested.
4. Stores the argument values if requested and if the type is in VALUE_TRACED_TYPES.
5. Wraps the result in a proxy object (unless requested otherwise).
Args:
log_args: Should we store the arguments (types and values)?
no_wrap_return: Some cases, e.g., __int__ don't allow a return value that is
not an int, so in some cases we have to disable wrapping.
Returns:
A decorated function
"""
def wrap(function):
def wrapped(*args, **kwargs):
self = args[0]
knowledge = UsageTraceNode.from_proxy(self)
nested_knowledge = knowledge.children[function.__name__]
if len(args) > 1:
if any(isinstance(arg, ObjectProxy) for arg in args[1:]):
# Only record access but nothing more, if we interact with another
# proxy.
return function(*args, **kwargs)
if log_args:
# Store argument types
for pos, arg in enumerate(args[1:]):
nested_knowledge.arg_types[pos].add(type(arg))
# Store argument values
for pos, arg in enumerate(args[1:]):
if type(arg) in VALUE_TRACED_TYPES:
nested_knowledge.arg_values[pos].add(arg)
if no_wrap_return or knowledge.depth >= _MAX_PROXY_NESTING:
return function(*args, **kwargs)
return ObjectProxy(function(*args, **kwargs), usage_trace=nested_knowledge)
return wrapped
return wrap
class _ObjectProxyMethods:
# We use properties to override the values of __module__ and
# __doc__. If we add these in ObjectProxy, the derived class
# __dict__ will still be setup to have string variants of these
# attributes and the rules of descriptors means that they appear to
# take precedence over the properties in the base class. To avoid
# that, we copy the properties into the derived class type itself
# via a meta class. In that way the properties will always take
# precedence.
@property
def __module__(self):
return self.__wrapped__.__module__ # type: ignore[attr-defined]
@__module__.setter
def __module__(self, value):
self.__wrapped__.__module__ = value # type: ignore[attr-defined]
@property
def __doc__(self):
return self.__wrapped__.__doc__ # type: ignore[attr-defined]
@__doc__.setter
def __doc__(self, value):
self.__wrapped__.__doc__ = value # type: ignore[attr-defined]
# We similar use a property for __dict__. We need __dict__ to be
# explicit to ensure that vars() works as expected.
@property
def __dict__(self): # type: ignore[override]
return self.__wrapped__.__dict__ # type: ignore[attr-defined]
# Need to also propagate the special __weakref__ attribute for case
# where decorating classes which will define this. If do not define
# it and use a function like inspect.getmembers() on a decorator
# class it will fail. This can't be in the derived classes.
@property
def __weakref__(self):
return self.__wrapped__.__weakref__ # type: ignore[attr-defined]
class _ObjectProxyMetaType(type):
def __new__(cls, name, bases, dictionary):
# Copy our special properties into the class so that they
# always take precedence over attributes of the same name added
# during construction of a derived class. This is to save
# duplicating the implementation for them in all derived classes.
dictionary.update(vars(_ObjectProxyMethods))
return type.__new__(cls, name, bases, dictionary)
[docs]
def unwrap(obj):
"""Unwrap the given object if it is a Proxy.
Args:
obj: The object to unwrap
Returns:
The unwrapped object
"""
while isinstance(obj, ObjectProxy):
obj = obj.__wrapped__ # type:ignore[has-type]
return obj
class ObjectProxy(metaclass=_ObjectProxyMetaType): # noqa: PLR0904
"""A proxy for (almost) any Python object.
Native types implemented in C might be problematic.
"""
def __init__(
self,
wrapped,
*,
usage_trace: UsageTraceNode | None = None,
is_kwargs: bool = False,
) -> None:
"""Initializes the proxy around a wrapped object.
Args:
wrapped: The wrapped object
usage_trace: An optional usage-trace node
is_kwargs: Whether the proxy is passed as **kwargs
"""
object.__setattr__(self, "__wrapped__", wrapped)
# What does this proxy know?
object.__setattr__(
self,
"_self_usage_trace_node",
UsageTraceNode(name="ROOT") if usage_trace is None else usage_trace,
)
# Is this proxy passed as **kwargs? If so, we can't return proxies from 'keys'
# but must return the raw string objects.
object.__setattr__(
self,
"_self_is_kwargs",
is_kwargs,
)
# Python 3.2+ has the __qualname__ attribute, but it does not
# allow it to be overridden using a property and it must instead
# be an actual string object instead.
with contextlib.suppress(AttributeError):
object.__setattr__(self, "__qualname__", wrapped.__qualname__)
# Python 3.10 onwards also does not allow itself to be overridden
# using a property and it must instead be set explicitly.
with contextlib.suppress(AttributeError):
object.__setattr__(self, "__annotations__", wrapped.__annotations__)
@property
def __name__(self): # noqa: PLW3201
return self.__wrapped__.__name__ # type:ignore[has-type]
@__name__.setter
def __name__(self, value): # noqa: PLW3201
self.__wrapped__.__name__ = value # type:ignore[has-type]
@property
def __class__(self):
return self.__wrapped__.__class__ # type:ignore[has-type]
@__class__.setter
def __class__(self, value):
self.__wrapped__.__class__ = value # type:ignore[has-type]
def __dir__(self):
return dir(self.__wrapped__) # type:ignore[has-type]
def __str__(self):
return str(self.__wrapped__) # type:ignore[has-type]
@proxify(no_wrap_return=True)
def __bytes__(self):
return bytes(self.__wrapped__) # type:ignore[has-type]
def __repr__(self):
return repr(self.__wrapped__) # type:ignore[has-type]
def __reversed__(self):
return reversed(self.__wrapped__) # type:ignore[has-type]
@proxify()
def __round__(self, *args):
return round(self.__wrapped__, *args) # type:ignore[has-type]
def __mro_entries__(self, bases):
return (self.__wrapped__,) # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __lt__(self, other):
return self.__wrapped__ < other # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __le__(self, other):
return self.__wrapped__ <= other # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __eq__(self, other):
return self.__wrapped__ == other # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __ne__(self, other):
return self.__wrapped__ != other # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __gt__(self, other):
return self.__wrapped__ > other # type:ignore[has-type]
@proxify(log_args=True, no_wrap_return=True)
def __ge__(self, other):
return self.__wrapped__ >= other # type:ignore[has-type]
def __hash__(self):
return hash(self.__wrapped__) # type:ignore[has-type]
@proxify(no_wrap_return=True)
def __bool__(self):
return bool(self.__wrapped__) # type:ignore[has-type]
def __setattr__(self, name, value):
if name.startswith("_self_"):
object.__setattr__(self, name, value)
elif name == "__wrapped__":
object.__setattr__(self, name, value)
with contextlib.suppress(AttributeError):
object.__delattr__(self, "__qualname__")
with contextlib.suppress(AttributeError):
object.__setattr__(self, "__qualname__", value.__qualname__)
with contextlib.suppress(AttributeError):
object.__delattr__(self, "__annotations__")
with contextlib.suppress(AttributeError):
object.__setattr__(self, "__annotations__", value.__annotations__)
elif name in {"__qualname__", "__annotations__"}:
setattr(self.__wrapped__, name, value) # type:ignore[has-type]
object.__setattr__(self, name, value)
elif hasattr(type(self), name):
object.__setattr__(self, name, value)
else:
node = UsageTraceNode.from_proxy(self)
accessed = node.children[name]
# Node is created implicitly.
assert accessed is not None
setattr(self.__wrapped__, name, value) # type:ignore[has-type]
def __getattr__(self, name):
# If we are being asked to lookup '__wrapped__' then the
# '__init__()' method cannot have been called.
if name == "__wrapped__":
raise ValueError("wrapper has not been initialised")
if name.startswith("_self_"):
return object.__getattribute__(self, name)
if name == "keys" and self._self_is_kwargs:
# dict for **kwargs
return getattr(self.__wrapped__, name) # type:ignore[has-type]
node = self._self_usage_trace_node
# Done before getattr, to make sure we store the access in case of an
# exception
child_node = node.children[name]
if node.depth >= _MAX_PROXY_NESTING:
return getattr(self.__wrapped__, name) # type:ignore[has-type]
return ObjectProxy(
getattr(self.__wrapped__, name), # type:ignore[has-type]
usage_trace=child_node,
)
def __delattr__(self, name):
if name.startswith("_self_"):
object.__delattr__(self, name)
elif name == "__wrapped__":
raise TypeError("__wrapped__ must be an object")
elif name == "__qualname__":
object.__delattr__(self, name)
delattr(self.__wrapped__, name) # type:ignore[has-type]
elif hasattr(type(self), name):
object.__delattr__(self, name)
else:
delattr(self.__wrapped__, name) # type:ignore[has-type]
@proxify(log_args=True)
def __add__(self, other):
return self.__wrapped__ + other # type:ignore[has-type]
@proxify(log_args=True)
def __sub__(self, other):
return self.__wrapped__ - other # type:ignore[has-type]
@proxify(log_args=True)
def __mul__(self, other):
return self.__wrapped__ * other # type:ignore[has-type]
@proxify(log_args=True)
def __truediv__(self, other):
return operator.truediv(self.__wrapped__, other) # type:ignore[has-type]
@proxify(log_args=True)
def __floordiv__(self, other):
return self.__wrapped__ // other # type:ignore[has-type]
@proxify(log_args=True)
def __mod__(self, other):
return self.__wrapped__ % other # type:ignore[has-type]
@proxify(log_args=True)
def __divmod__(self, other):
return divmod(self.__wrapped__, other) # type:ignore[has-type]
@proxify(log_args=True)
def __pow__(self, other, *args):
return pow(self.__wrapped__, other, *args) # type:ignore[has-type]
@proxify(log_args=True)
def __lshift__(self, other):
return self.__wrapped__ << other # type:ignore[has-type]
@proxify(log_args=True)
def __rshift__(self, other):
return self.__wrapped__ >> other # type:ignore[has-type]
@proxify(log_args=True)
def __and__(self, other):
return self.__wrapped__ & other # type:ignore[has-type]
@proxify(log_args=True)
def __xor__(self, other):
return self.__wrapped__ ^ other # type:ignore[has-type]
@proxify(log_args=True)
def __or__(self, other):
return self.__wrapped__ | other # type:ignore[has-type]
@proxify(log_args=True)
def __radd__(self, other):
return other + self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rsub__(self, other):
return other - self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rmul__(self, other):
return other * self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rtruediv__(self, other):
return operator.truediv(other, self.__wrapped__) # type:ignore[has-type]
@proxify(log_args=True)
def __rfloordiv__(self, other):
return other // self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rmod__(self, other):
return other % self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rdivmod__(self, other):
return divmod(other, self.__wrapped__) # type:ignore[has-type]
@proxify(log_args=True)
def __rpow__(self, other, *args):
return pow(other, self.__wrapped__, *args) # type:ignore[has-type]
@proxify(log_args=True)
def __rlshift__(self, other):
return other << self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rrshift__(self, other):
return other >> self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rand__(self, other):
return other & self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __rxor__(self, other):
return other ^ self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __ror__(self, other):
return other | self.__wrapped__ # type:ignore[has-type]
@proxify(log_args=True)
def __iadd__(self, other): # type:ignore[misc]
self.__wrapped__ += other # type:ignore[has-type]
return self
@proxify(log_args=True)
def __isub__(self, other): # type:ignore[misc]
self.__wrapped__ -= other # type:ignore[has-type]
return self
@proxify(log_args=True)
def __imul__(self, other): # type:ignore[misc]
self.__wrapped__ *= other # type:ignore[has-type]
return self
@proxify(log_args=True)
def __itruediv__(self, other): # type:ignore[misc]
self.__wrapped__ = operator.itruediv(
self.__wrapped__, # type: ignore[has-type]
other,
)
return self
@proxify(log_args=True)
def __ifloordiv__(self, other): # type:ignore[misc]
self.__wrapped__ //= other
return self
@proxify(log_args=True)
def __imod__(self, other): # type:ignore[misc]
self.__wrapped__ %= other
return self
@proxify(log_args=True)
def __ipow__(self, other): # type:ignore[misc]
self.__wrapped__ **= other
return self
@proxify(log_args=True)
def __ilshift__(self, other): # type:ignore[misc]
self.__wrapped__ <<= other
return self
@proxify(log_args=True)
def __irshift__(self, other): # type:ignore[misc]
self.__wrapped__ >>= other
return self
@proxify(log_args=True)
def __iand__(self, other): # type:ignore[misc]
self.__wrapped__ &= other
return self
@proxify(log_args=True)
def __ixor__(self, other): # type:ignore[misc]
self.__wrapped__ ^= other
return self
@proxify(log_args=True)
def __ior__(self, other): # type:ignore[misc]
self.__wrapped__ |= other
return self
@proxify()
def __neg__(self):
return -self.__wrapped__
@proxify()
def __pos__(self):
return +self.__wrapped__
@proxify()
def __abs__(self):
return abs(self.__wrapped__)
@proxify()
def __invert__(self):
return ~self.__wrapped__
@proxify(no_wrap_return=True)
def __int__(self):
return int(self.__wrapped__)
@proxify(no_wrap_return=True)
def __float__(self):
return float(self.__wrapped__)
@proxify(no_wrap_return=True)
def __complex__(self):
return complex(self.__wrapped__)
@proxify(no_wrap_return=True)
def __index__(self):
return operator.index(self.__wrapped__)
@proxify()
def __len__(self):
# len turns result into an integer
return len(self.__wrapped__)
@proxify(log_args=True)
def __contains__(self, value):
return value in self.__wrapped__
@proxify(log_args=True)
def __getitem__(self, key):
return self.__wrapped__[key]
@proxify(log_args=True)
def __setitem__(self, key, value):
self.__wrapped__[key] = value
@proxify()
def __delitem__(self, key):
del self.__wrapped__[key]
def __enter__(self):
return self.__wrapped__.__enter__()
def __exit__(self, *args, **kwargs):
return self.__wrapped__.__exit__(*args, **kwargs)
def __iter__(self):
node = self._self_usage_trace_node
nested_node = node.children["__iter__"]
if node.depth >= _MAX_PROXY_NESTING:
yield from self.__wrapped__
else:
for i in self.__wrapped__:
proxy = ObjectProxy(i, usage_trace=nested_node)
yield proxy
# These do not give us any hint.
# def __copy__(self):
# raise NotImplementedError(
# 'object proxy must define __copy__()')
#
# def __deepcopy__(self, memo):
# raise NotImplementedError(
# 'object proxy must define __deepcopy__()')
#
# def __reduce__(self):
# raise NotImplementedError(
# 'object proxy must define __reduce_ex__()')
#
# def __reduce_ex__(self, protocol):
# raise NotImplementedError(
# 'object proxy must define __reduce_ex__()')
@proxify(log_args=True)
def __call__(self, *args, **kwargs): # noqa: D102
return self.__wrapped__(*args, **kwargs)
[docs]
@contextlib.contextmanager
def shim_isinstance():
"""Context manager that temporarily replaces isinstance with a shim.
The shim is aware of ObjectProxies.
Yields:
resets the shim
"""
orig_isinstance = builtins.isinstance
def shim(inst, types):
if type(inst) is ObjectProxy:
if types is ObjectProxy or orig_isinstance(types, ObjectProxy):
return orig_isinstance(inst, types)
if orig_isinstance(types, tuple):
if any(typ is ObjectProxy or orig_isinstance(typ, ObjectProxy) for typ in types):
return orig_isinstance(inst, types)
UsageTraceNode.from_proxy(inst).type_checks.update(types)
else:
UsageTraceNode.from_proxy(inst).type_checks.add(types)
return orig_isinstance(inst, types)
builtins.isinstance = shim
yield
builtins.isinstance = orig_isinstance