# This file is part of Pynguin.
#
# SPDX-FileCopyrightText: 2019–2026 Pynguin Contributors
#
# SPDX-License-Identifier: MIT
#
"""Contains all code related to test-case execution."""
from __future__ import annotations
import contextlib
import inspect
import logging
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Sized
from dataclasses import dataclass, field
from functools import wraps
from itertools import count
from math import inf
from opcode import opname
from types import BuiltinFunctionType, BuiltinMethodType, CodeType, MethodType, TracebackType
from typing import TYPE_CHECKING, Concatenate, ParamSpec
from bytecode.instr import CellVar, FreeVar
import pynguin.assertion.assertion as ass
import pynguin.slicer.executedinstruction as ei
import pynguin.testcase.statement as stmt
import pynguin.utils.typetracing as tt
from pynguin.instrumentation import PynguinCompare, version
from pynguin.utils.exceptions import TracingAbortedException
from pynguin.utils.orderedset import OrderedSet
from pynguin.utils.type_utils import (
given_exception_matches,
is_bytes,
is_numeric,
is_string,
string_distance,
string_le_distance,
string_lt_distance,
)
if TYPE_CHECKING:
from collections.abc import Generator
from typing_extensions import Self
from pynguin.instrumentation.controlflow import CFG, BasicBlockNode, ControlDependenceGraph
immutable_types = (int, float, complex, str, tuple, frozenset, bytes)
VariableName = str | CellVar | FreeVar
[docs]
@dataclass
class ExecutedAssertion:
"""Data class for assertions of a testcase traced during execution for slicing."""
# the position inside the exection trace of the executed assertion
trace_position: int
# the assertion object of a statement that was executed
assertion: ass.Assertion
[docs]
@dataclass
class ExecutionTrace:
"""Stores trace information about the execution."""
_logger = logging.getLogger(__name__)
executed_code_objects: OrderedSet[int] = field(default_factory=OrderedSet)
executed_predicates: dict[int, int] = field(default_factory=dict)
true_distances: dict[int, float] = field(default_factory=dict)
false_distances: dict[int, float] = field(default_factory=dict)
covered_line_ids: OrderedSet[int] = field(default_factory=OrderedSet)
executed_instructions: list[ei.ExecutedInstruction] = field(default_factory=list)
object_addresses: OrderedSet[int] = field(default_factory=OrderedSet)
executed_assertions: list[ExecutedAssertion] = field(default_factory=list)
checked_lines: OrderedSet[int] = field(default_factory=OrderedSet)
[docs]
def merge(self, other: ExecutionTrace) -> None:
"""Merge the values from the other execution trace.
Args:
other: Merges the other traces into this trace
"""
self.executed_code_objects.update(other.executed_code_objects)
for key, value in other.executed_predicates.items():
self.executed_predicates[key] = self.executed_predicates.get(key, 0) + value
self._merge_min(self.true_distances, other.true_distances)
self._merge_min(self.false_distances, other.false_distances)
self.covered_line_ids.update(other.covered_line_ids)
self.checked_lines.update(other.checked_lines)
shift: int = len(self.executed_instructions)
self.executed_instructions.extend(other.executed_instructions)
self.object_addresses.update(other.object_addresses)
self.executed_assertions.extend(
ExecutedAssertion(
executed_assertion.trace_position + shift,
executed_assertion.assertion,
)
for executed_assertion in other.executed_assertions
)
@staticmethod
def _merge_min(target: dict[int, float], source: dict[int, float]) -> None:
"""Merge source into target. Minimum value wins.
Args:
target: the target to merge the values in
source: the source of the merge
"""
for key, value in source.items():
target[key] = min(target.get(key, inf), value)
[docs]
def update_predicate_distances(
self, distance_true: float, distance_false: float, predicate: int
) -> None:
"""Update the distances and predicate execution count.
Args:
distance_true: the measured true distance
distance_false: the measured false distance
predicate: the predicate id
"""
self.executed_predicates[predicate] = self.executed_predicates.get(predicate, 0) + 1
self.true_distances[predicate] = min(self.true_distances.get(predicate, inf), distance_true)
self.false_distances[predicate] = min(
self.false_distances.get(predicate, inf), distance_false
)
[docs]
def add_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
"""Creates a new ExecutedInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
"""
executed_instr = ei.ExecutedInstruction(
module, code_object_id, node_id, opcode, None, lineno, offset
)
self.executed_instructions.append(executed_instr)
[docs]
def add_memory_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
arg_name: str | tuple[str, str],
arg_address: int | tuple[int, int],
is_mutable_type: bool | tuple[bool, bool], # noqa: FBT001
object_creation: bool | tuple[bool, bool], # noqa: FBT001
) -> None:
"""Creates a new ExecutedMemoryInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
arg_name: the name of the argument
arg_address: the memory address of the argument
is_mutable_type: if the argument is mutable
object_creation: if the instruction creates the object used
"""
executed_instr = ei.ExecutedMemoryInstruction(
module,
code_object_id,
node_id,
opcode,
arg_name,
lineno,
offset,
arg_address,
is_mutable_type,
object_creation,
)
self.executed_instructions.append(executed_instr)
[docs]
def add_attribute_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
attr_name: str,
src_address: int,
arg_address: int,
is_mutable_type: bool, # noqa: FBT001
is_method: bool, # noqa: FBT001
) -> None:
"""Creates a new ExecutedAttributeInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
attr_name: the name of the accessed attribute
src_address: the memory address of the attribute
arg_address: the memory address of the argument
is_mutable_type: if the attribute is mutable
is_method: if the attribute is a method
"""
executed_instr = ei.ExecutedAttributeInstruction(
module,
code_object_id,
node_id,
opcode,
attr_name,
lineno,
offset,
src_address,
arg_address,
is_mutable_type,
is_method,
)
self.executed_instructions.append(executed_instr)
[docs]
def add_jump_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
target_id: int,
) -> None:
"""Creates a new ExecutedControlInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
target_id: the target offset to jump to
"""
executed_instr = ei.ExecutedControlInstruction(
module, code_object_id, node_id, opcode, target_id, lineno, offset
)
self.executed_instructions.append(executed_instr)
[docs]
def add_call_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
arg: int,
) -> None:
"""Creates a new ExecutedCallInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
arg: the argument to the instruction
"""
executed_instr = ei.ExecutedCallInstruction(
module, code_object_id, node_id, opcode, arg, lineno, offset
)
self.executed_instructions.append(executed_instr)
[docs]
def add_return_instruction( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
"""Creates a new ExecutedReturnInstruction object and adds it to the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
"""
executed_instr = ei.ExecutedReturnInstruction(
module, code_object_id, node_id, opcode, None, lineno, offset
)
self.executed_instructions.append(executed_instr)
[docs]
@dataclass
class SubjectProperties:
"""Contains properties about the subject under test.
The subject properties are `code objects`, `predicates` and `lines`:
- **Code Objects**:
Compiled chunks of code (functions, methods, modules).
Tracked in `CodeObjectMetaData` with references to the compiled code, parent,
control graphs, and a unique ID. Represent the program's structural units.
- **Predicates**:
Decision points within code objects (e.g., ``if``, ``while``).
Tracked in `PredicateMetaData` with line number, owning code object, and graph node.
Used for branch coverage and measuring branch distances.
- **Lines**:
Individual lines of code within code objects.
Tracked in `LineMetaData` with file name and line number.
Used for measuring line coverage.
**Example**::
def example(x):
if x > 0: # Predicate
return "pos"
return "non-pos"
# The function ``example`` is a Code Object and ``x > 0`` is a Predicate.
"""
# TODO(lk): SubjectProperties and ExecutionTracer should be separated
# The instrumentation tracer that is used to trace the execution
instrumentation_tracer: InstrumentationExecutionTracer = field(
default_factory=lambda: InstrumentationExecutionTracer(ExecutionTracer())
)
# The counter used to generate unique code object ids
code_object_counter: count[int] = field(default_factory=count)
# Maps all known ids of Code Objects to meta information
existing_code_objects: dict[int, CodeObjectMetaData] = field(default_factory=dict)
# Maps all known ids of predicates to meta information
existing_predicates: dict[int, PredicateMetaData] = field(default_factory=dict)
# Stores which line id represents which line in which file
existing_lines: dict[int, LineMetaData] = field(default_factory=dict)
@property
def branch_less_code_objects(self) -> Iterable[int]:
"""Get the existing code objects that do not contain a branch.
Every code object is initially seen as branch-less until a predicate is registered for it.
Returns:
The existing code objects that do not contain a branch.
"""
return (
code_object_id
for code_object_id in self.existing_code_objects
if all(
code_object_id != metadata.code_object_id
for metadata in self.existing_predicates.values()
)
)
[docs]
def reset(self) -> None:
"""Resets the subject properties."""
self.code_object_counter = count()
self.existing_code_objects.clear()
self.existing_predicates.clear()
self.existing_lines.clear()
self.instrumentation_tracer.reset()
[docs]
def create_code_object_id(self) -> int:
"""Create a new code object ID.
Returns:
A new code object ID.
"""
return next(self.code_object_counter)
[docs]
def register_code_object(self, code_object_id: int, meta: CodeObjectMetaData) -> None:
"""Declare that a code object exists.
Args:
code_object_id: the id of the code object, which should be used to identify the object
during instrumentation.
meta: the code objects existing
"""
assert code_object_id not in self.existing_code_objects, (
"Code object already registered in existing code objects"
)
self.existing_code_objects[code_object_id] = meta
[docs]
def register_predicate(self, meta: PredicateMetaData) -> int:
"""Declare that a predicate exists.
Args:
meta: Metadata about the predicates
Returns:
the id of the predicate, which can be used to identify the predicate
during instrumentation.
"""
assert (meta.node, meta.code_object_id) not in {
(p.node, p.code_object_id) for p in self.existing_predicates.values()
}, "Predicate with the same node already registered"
predicate_id = len(self.existing_predicates)
self.existing_predicates[predicate_id] = meta
return predicate_id
[docs]
def register_line(self, meta: LineMetaData) -> int:
"""Tracks the existence of a line.
Args:
meta: Metadata about the line
Returns:
the id of the registered line
"""
if meta not in self.existing_lines.values():
line_id = len(self.existing_lines)
self.existing_lines[line_id] = meta
else:
index = list(self.existing_lines.values()).index(meta)
line_id = list(self.existing_lines.keys())[index]
return line_id
[docs]
def validate_execution_trace(self, execution_trace: ExecutionTrace) -> None:
"""Validate the execution trace.
Args:
execution_trace: The execution trace to validate
Raises:
AssertionError: if the execution trace is invalid
"""
for code_object_id in execution_trace.executed_code_objects:
assert code_object_id in self.existing_code_objects, (
f"Code object id {code_object_id} not registered in subject properties"
)
for predicate_id in execution_trace.executed_predicates:
assert predicate_id in self.existing_predicates, (
f"Predicate id {predicate_id} not registered in subject properties"
)
for line_id in execution_trace.covered_line_ids:
assert line_id in self.existing_lines, (
f"Line id {line_id} not registered in subject properties"
)
[docs]
def lineids_to_linenos(self, line_ids: OrderedSet[int]) -> OrderedSet[int]:
"""Convenience method to translate line ids to line numbers.
Args:
line_ids: The ids that should be translated.
Returns:
The line numbers.
"""
return OrderedSet([self.existing_lines[line_id].line_number for line_id in line_ids])
[docs]
class AbstractExecutionTracer(ABC): # noqa: PLR0904
"""An abstract execution tracer.
The results are stored in an execution trace.
"""
@abstractmethod
def __enter__(self) -> Self:
"""Activate the tracer for the current thread.
Returns:
The tracer itself, so it can be used as a context manager.
"""
@abstractmethod
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
"""Deactivate the tracer for the current thread.
Args:
exc_type: The type of the exception, if any.
exc_value: The value of the exception, if any.
traceback: The traceback of the exception, if any.
"""
[docs]
@abstractmethod
def check(self) -> None:
"""Check if the thread that called this method should still be running.
Raises:
RuntimeError: if the thread is not running anymore.
"""
@property
@abstractmethod
def import_trace(self) -> ExecutionTrace:
"""The trace that was generated when the SUT was imported.
Returns:
The execution trace after executing the import statements
"""
[docs]
@abstractmethod
def reset(self) -> None:
"""Resets everything.
Should be called before instrumentation. Clears all data, so we can handle a
reload of the SUT.
"""
[docs]
@abstractmethod
def store_import_trace(self) -> None:
"""Stores the current trace as the import trace.
Should only be done once, after a module was loaded. The import trace will be
merged into every subsequently recorded trace.
"""
[docs]
@abstractmethod
def init_trace(self) -> None:
"""Create a new trace that only contains the trace data from the import."""
[docs]
@abstractmethod
def is_disabled(self) -> bool:
"""Should we track anything?
We might have to disable tracing, e.g. when calling __eq__ ourselves.
Otherwise, we create an endless recursion.
Returns:
Whether we should track anything
"""
[docs]
@abstractmethod
def enable(self) -> None:
"""Enable tracing."""
[docs]
@abstractmethod
def disable(self) -> None:
"""Disable tracing."""
[docs]
@contextlib.contextmanager
def temporarily_disable(self) -> Generator[None, None, None]:
"""Temporarily disable tracing.
If the tracing is already disabled, do nothing.
"""
if self.is_disabled():
yield
return
self.disable()
yield
self.enable()
[docs]
@contextlib.contextmanager
def temporarily_enable(self) -> Generator[None, None, None]:
"""Temporarily enable tracing.
If the tracing is already enabled, do nothing.
"""
if not self.is_disabled():
yield
return
self.enable()
yield
self.disable()
[docs]
@abstractmethod
def stop(self) -> None:
"""Stop the tracer.
This should be called when the tracer is no longer needed, e.g., when the test
case execution is finished.
"""
[docs]
@abstractmethod
def get_trace(self) -> ExecutionTrace:
"""Get the trace with the current information.
Returns:
The current execution trace
"""
[docs]
@abstractmethod
def executed_code_object(self, code_object_id: int) -> None:
"""Mark a code object as executed.
This means, that the routine which refers to this code object was at least
called once.
Args:
code_object_id: the code object id to mark
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def executed_compare_predicate(
self, value1, value2, predicate: int, cmp_op: PynguinCompare
) -> None:
"""A predicate that is based on a comparison was executed.
Args:
value1: the first value
value2: the second value
predicate: the predicate identifier
cmp_op: the compare operation
Raises:
RuntimeError: raised when called from another thread.
AssertionError: when encountering an unknown compare op.
"""
[docs]
@abstractmethod
def executed_bool_predicate(self, value, predicate: int) -> None:
"""A predicate that is based on a boolean value was executed.
Args:
value: the value
predicate: the predicate identifier
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def executed_in_presence_predicate(self, value1, value2, predicate: int) -> None:
"""An auxiliary membership predicate was executed.
Computes a guided branch distance for ``value1 in value2`` with an
execution-time guard on the container size to limit overhead.
Args:
value1: The prospective key/index to check membership for.
value2: The container to check membership in.
predicate: The predicate identifier.
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def executed_exception_match(
self,
err: BaseException | type[BaseException],
exc: type[BaseException],
predicate: int,
) -> None:
"""A predicate that is based on exception matching was executed.
Args:
err: The raised exception
exc: The matching condition
predicate: the predicate identifier
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_line_visit(self, line_id: int) -> None:
"""Tracks the visit of a line.
Args:
line_id: the if of the line that was visited
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_generic( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
"""Track a generic instruction inside the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_memory_access( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
var_name: VariableName | tuple[VariableName, VariableName],
var_value: object,
) -> None:
"""Track a memory access instruction in the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
var_name: the used variable name
var_value: the value stored in the used variable
Raises:
ValueError: when no argument is given
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_attribute_access( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
attr_name: str | None,
obj: object,
) -> None:
"""Track an attribute access instruction in the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
attr_name: the name of the accessed attribute
obj: the object containing the accessed attribute
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_jump( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
target_id: int,
) -> None:
"""Track a jump instruction in the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
target_id: the offset of the target of the jump
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_call( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
arg: int,
) -> None:
"""Track a method call instruction in the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
arg: the argument used in the method call
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_return( # noqa: PLR0917
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
"""Track a return instruction in the trace.
Args:
module: File name of the module containing the instruction
code_object_id: code object containing the instruction
node_id: the node of the code object containing the instruction
opcode: the opcode of the instruction
lineno: the line number of the instruction
offset: the offset of the instruction
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_exception_assertion(self, statement: stmt.Statement) -> None:
"""Track the position of an exception assertion in the trace.
Normally, to track an assertion, we trace the POP_JUMP_IF_TRUE instruction
contained by each assertion. The pytest exception assertion does not use
an assertion containing this instruction.
Therefore, we trace the instruction that was last executed before
the exception.
Args:
statement: the statement causing the exception
Raises:
RuntimeError: raised when called from another thread
"""
[docs]
@abstractmethod
def track_assertion_position(self, assertion: ass.Assertion) -> None:
"""Track the position of an assertion in the trace.
Args:
assertion: the assertion of the statement
Raises:
RuntimeError: raised when called from another thread
"""
@abstractmethod
def __getstate__(self) -> dict:
"""Gets the state.
Returns:
The state
"""
@abstractmethod
def __setstate__(self, state: dict) -> None:
"""Sets the state.
Args:
state: The state
"""
def _eq(val1, val2) -> float:
"""Distance computation for '=='.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
try:
if val1 == val2:
return 0.0
except TypeError:
pass
if is_numeric(val1) and is_numeric(val2):
return float(abs(val1 - val2))
if is_string(val1) and is_string(val2):
return string_distance(val1, val2)
if is_bytes(val1) and is_bytes(val2):
return string_distance(val1.decode("iso-8859-1"), val2.decode("iso-8859-1"))
return inf
def _neq(val1, val2) -> float:
"""Distance computation for '!='.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
if val1 != val2:
return 0.0
return 1.0
def _lt(val1, val2) -> float:
"""Distance computation for '<'.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
if val1 < val2:
return 0.0
if is_numeric(val1) and is_numeric(val2):
return (float(val1) - float(val2)) + 1.0
if is_string(val1) and is_string(val2):
return string_lt_distance(val1, val2)
if is_bytes(val1) and is_bytes(val2):
return string_lt_distance(val1.decode("iso-8859-1"), val2.decode("iso-8859-1"))
return inf
def _le(val1, val2) -> float:
"""Distance computation for '<='.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
if val1 <= val2:
return 0.0
if is_numeric(val1) and is_numeric(val2):
return float(val1) - float(val2)
if is_string(val1) and is_string(val2):
return string_le_distance(val1, val2)
if is_bytes(val1) and is_bytes(val2):
return string_le_distance(val1.decode("iso-8859-1"), val2.decode("iso-8859-1"))
return inf
def _in(val1, val2) -> float:
"""Distance computation for 'in'.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
try:
if val1 in val2:
return 0.0
except TypeError:
# If `val2` does not support membership tests, we will handle it below.
pass
# TODO(fk) maybe limit this to certain collections?
# Check only if collection size is within some range,
# otherwise the check might take very long.
# If `val2` is not iterable, there is no element to compare against.
if not isinstance(val2, Iterable):
return inf
# Use the shortest distance to any element of the iterable.
return min([_eq(val1, v) for v in val2] + [inf])
def _nin(val1, val2) -> float:
"""Distance computation for 'not in'.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
try:
if val1 not in val2:
return 0.0
except TypeError:
# Fallback to assuming element is not in collection if `val2` is not iterable
return 0.0
return 1.0
def _is(val1, val2) -> float:
"""Distance computation for 'is'.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
if val1 is val2:
return 0.0
return 1.0
def _isn(val1, val2) -> float:
"""Distance computation for 'is not'.
Args:
val1: the first value
val2: the second value
Returns:
the distance
"""
if val1 is not val2:
return 0.0
return 1.0
_P = ParamSpec("_P")
def _early_return(
func: Callable[Concatenate[ExecutionTracer, _P], None],
) -> Callable[Concatenate[ExecutionTracer, _P], None]:
@wraps(func)
def wrapper(self: ExecutionTracer, *args: _P.args, **kwargs: _P.kwargs) -> None:
if self.is_disabled():
return
self.check()
func(self, *args, **kwargs)
return wrapper
[docs]
class ExecutionTracer(AbstractExecutionTracer): # noqa: PLR0904
"""Tracks branch distances and covered statements during execution.
The results are stored in an execution trace.
"""
_logger = logging.getLogger(__name__)
[docs]
class TracerLocalState(threading.local):
"""Encapsulate state that is thread specific."""
def __init__(self): # noqa: D107
super().__init__()
self.enabled = True
self.trace = ExecutionTrace()
def __init__(self) -> None: # noqa: D107
# Contains the trace information that is generated when a module is imported
self._import_trace = ExecutionTrace()
# Thread local state
self._thread_local_state = ExecutionTracer.TracerLocalState()
self.init_trace()
self._current_thread_identifier: int | None = None
self._current_code_object_id = 0
def __enter__(self) -> Self:
self._current_thread_identifier = threading.current_thread().ident
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.stop()
[docs]
def check(self) -> None: # noqa: D102
if threading.current_thread().ident != self._current_thread_identifier:
raise TracingAbortedException(
"The current thread shall not be executed anymore, thus I kill it."
)
@property
def import_trace(self) -> ExecutionTrace: # noqa: D102
copied = ExecutionTrace()
copied.merge(self._import_trace)
return copied
@property
def state(self) -> dict:
"""Get the current state.
Returns:
The current state
"""
return {
"import_trace": self._import_trace,
"current_thread_identifier": self._current_thread_identifier,
"thread_local_state": {
"enabled": self._thread_local_state.enabled,
"trace": self._thread_local_state.trace,
},
}
@state.setter
def state(self, state: dict) -> None:
"""Set the current state.
Args:
state: The state to set
"""
self._import_trace = state["import_trace"]
self._current_thread_identifier = state["current_thread_identifier"]
self._thread_local_state = ExecutionTracer.TracerLocalState()
self._thread_local_state.enabled = state["thread_local_state"]["enabled"]
self._thread_local_state.trace = state["thread_local_state"]["trace"]
[docs]
def reset(self) -> None: # noqa: D102
self._import_trace = ExecutionTrace()
self.init_trace()
[docs]
def store_import_trace(self) -> None: # noqa: D102
self._import_trace = self._thread_local_state.trace
self.init_trace()
[docs]
def init_trace(self) -> None: # noqa: D102
new_trace = ExecutionTrace()
new_trace.merge(self._import_trace)
self._thread_local_state.trace = new_trace
[docs]
def is_disabled(self) -> bool: # noqa: D102
return not self._thread_local_state.enabled
[docs]
def enable(self) -> None: # noqa: D102
self._thread_local_state.enabled = True
[docs]
def disable(self) -> None: # noqa: D102
self._thread_local_state.enabled = False
[docs]
def stop(self) -> None: # noqa: D102
self._current_thread_identifier = None
[docs]
def get_trace(self) -> ExecutionTrace: # noqa: D102
return self._thread_local_state.trace
[docs]
@_early_return
def executed_code_object(self, code_object_id: int) -> None: # noqa: D102
self._thread_local_state.trace.executed_code_objects.add(code_object_id)
[docs]
@_early_return
def executed_compare_predicate( # noqa: D102, C901
self, value1, value2, predicate: int, cmp_op: PynguinCompare
) -> None:
with self.temporarily_disable():
value1 = tt.unwrap(value1)
value2 = tt.unwrap(value2)
match cmp_op:
case PynguinCompare.EQ:
distance_true, distance_false = _eq(value1, value2), _neq(value1, value2)
case PynguinCompare.NE:
distance_true, distance_false = _neq(value1, value2), _eq(value1, value2)
case PynguinCompare.LT:
distance_true, distance_false = (
_lt(value1, value2),
_le(value2, value1),
)
case PynguinCompare.LE:
distance_true, distance_false = (
_le(value1, value2),
_lt(value2, value1),
)
case PynguinCompare.GT:
distance_true, distance_false = (
_lt(value2, value1),
_le(value1, value2),
)
case PynguinCompare.GE:
distance_true, distance_false = (
_le(value2, value1),
_lt(value1, value2),
)
case PynguinCompare.IN:
distance_true, distance_false = (
_in(value1, value2),
_nin(value1, value2),
)
case PynguinCompare.NOT_IN:
distance_true, distance_false = (
_nin(value1, value2),
_in(value1, value2),
)
case PynguinCompare.IS:
distance_true, distance_false = (
_is(value1, value2),
_isn(value1, value2),
)
case PynguinCompare.IS_NOT:
distance_true, distance_false = (
_isn(value1, value2),
_is(value1, value2),
)
case _:
raise AssertionError("Unknown compare op")
self._update_metrics(distance_false, distance_true, predicate)
[docs]
@_early_return
def executed_bool_predicate(self, value, predicate: int) -> None: # noqa: D102
with self.temporarily_disable():
distance_true = 0.0
distance_false = 0.0
# Might be necessary when using Proxies.
value = tt.unwrap(value)
if value:
if isinstance(value, Sized):
# Sized instances evaluate to False if they are empty,
# and to True otherwise, thus we can use their size as a distance
# measurement.
distance_false = len(value)
elif is_numeric(value):
# For numeric value, we can use their absolute value
distance_false = float(abs(value))
else:
# Necessary to use inf instead of 1.0 here,
# so that a value for which we can't compute a false distance
# always has the greatest distance to the false branch than an
# object for which we can compute a distance.
distance_false = inf
else:
distance_true = 1.0
self._update_metrics(distance_false, distance_true, predicate)
[docs]
@_early_return
def executed_in_presence_predicate(self, value1, value2, predicate: int) -> None:
"""Conditionally compute an auxiliary 'IN' predicate distance.
This helper provides guidance for subscripts like ``container[key]`` by
reporting a membership distance ``key in container`` before the subscript
executes. To control overhead, it only computes a distance when the
container is sized and its size does not exceed ``max_container_size``.
Args:
value1: The prospective key/index to look up (e.g., ``key``).
value2: The container to check membership in (e.g., ``container``).
predicate: The predicate id to update.
"""
with self.temporarily_disable():
value1 = tt.unwrap(value1)
value2 = tt.unwrap(value2)
distance_true, distance_false = _in(value1, value2), _nin(value1, value2)
self._update_metrics(distance_false, distance_true, predicate)
[docs]
@_early_return
def executed_exception_match( # noqa: D102
self,
err: BaseException | type[BaseException],
exc: type[BaseException],
predicate: int,
) -> None:
with self.temporarily_disable():
distance_true = 0.0
distance_false = 0.0
# Might be necessary when using Proxies.
err = tt.unwrap(err)
exc = tt.unwrap(exc)
if isinstance(err, BaseException):
err = type(err)
if given_exception_matches(err, exc):
distance_false = 1.0
else:
distance_true = 1.0
self._update_metrics(distance_false, distance_true, predicate)
[docs]
@_early_return
def track_line_visit(self, line_id: int) -> None: # noqa: D102
self._thread_local_state.trace.covered_line_ids.add(line_id)
def _update_metrics(self, distance_false: float, distance_true: float, predicate: int):
assert distance_true >= 0.0, "True distance cannot be negative"
assert distance_false >= 0.0, "False distance cannot be negative"
assert (distance_true == 0.0) ^ (distance_false == 0.0), (
"Exactly one distance must be 0.0, i.e., one branch must be taken."
)
self._thread_local_state.trace.update_predicate_distances(
distance_true=distance_true,
distance_false=distance_false,
predicate=predicate,
)
[docs]
@_early_return
def track_generic( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
self._thread_local_state.trace.add_instruction(
module, code_object_id, node_id, opcode, lineno, offset
)
def _extract_arguments(
self, var_name: VariableName, var_value: object
) -> tuple[str, int, bool, bool]:
var_address = id(var_value)
var_type = type(var_value)
if isinstance(var_name, CellVar | FreeVar):
var_name = var_name.name
# Determine if this is a mutable type
mutable_type = var_type not in immutable_types
# Determine if this is a definition of a completely new object
# (required later during slicing)
object_creation = (
bool(var_address) and var_address not in self._thread_local_state.trace.object_addresses
)
if object_creation:
self._thread_local_state.trace.object_addresses.add(var_address)
return var_name, var_address, mutable_type, object_creation
[docs]
@_early_return
def track_memory_access( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
var_name: VariableName | tuple[VariableName, VariableName],
var_value: object,
) -> None:
# IMPORT_NAMEs may not have arguments
assert var_name or opname[opcode] in version.IMPORT_NAME_NAMES, (
"A memory access instruction must have an argument or be an import"
)
arg_name: str | tuple[str, str]
arg_address: int | tuple[int, int]
mutable_type: bool | tuple[bool, bool]
object_creation: bool | tuple[bool, bool]
if (
isinstance(var_name, tuple)
and isinstance(var_value, tuple)
and len(var_name) == 2
and len(var_value) == 2
):
arg_name0, arg_address0, mutable_type0, object_creation0 = self._extract_arguments(
var_name[0], var_value[0]
)
arg_name1, arg_address1, mutable_type1, object_creation1 = self._extract_arguments(
var_name[1], var_value[1]
)
arg_name = (arg_name0, arg_name1)
arg_address = (arg_address0, arg_address1)
mutable_type = (mutable_type0, mutable_type1)
object_creation = (object_creation0, object_creation1)
elif isinstance(var_name, (str, CellVar, FreeVar)):
arg_name, arg_address, mutable_type, object_creation = self._extract_arguments(
var_name, var_value
)
else:
raise AssertionError(f"Unexpected argument types: {var_name}, {var_value}")
self._thread_local_state.trace.add_memory_instruction(
module,
code_object_id,
node_id,
opcode,
lineno,
offset,
arg_name,
arg_address,
mutable_type,
object_creation,
)
[docs]
@_early_return
def track_attribute_access( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
attr_name: str | None,
obj: object,
) -> None:
arg_type: type
if attr_name is None:
attr_name = "None"
src_address = id(obj)
arg_address = -1
arg_type = type(None)
else:
src_address = self.attribute_lookup(obj, attr_name)
attr_value = getattr(obj, attr_name)
arg_address = id(attr_value)
arg_type = type(attr_value)
# Different built-in methods and functions often have the same address when
# accessed sequentially.
# The address is not recorded in such cases.
if arg_type is BuiltinMethodType or arg_type is BuiltinFunctionType:
arg_address = -1
# Determine if this is a mutable type
mutable_type = True
if arg_type in immutable_types:
mutable_type = False
is_method = arg_type is MethodType or arg_type is BuiltinMethodType
self._thread_local_state.trace.add_attribute_instruction(
module,
code_object_id,
node_id,
opcode,
lineno,
offset,
attr_name,
src_address,
arg_address,
mutable_type,
is_method,
)
[docs]
@_early_return
def track_jump( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
target_id: int,
) -> None:
self._thread_local_state.trace.add_jump_instruction(
module, code_object_id, node_id, opcode, lineno, offset, target_id
)
[docs]
@_early_return
def track_call( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
arg: int,
) -> None:
self._thread_local_state.trace.add_call_instruction(
module, code_object_id, node_id, opcode, lineno, offset, arg
)
[docs]
@_early_return
def track_return( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
self._thread_local_state.trace.add_return_instruction(
module, code_object_id, node_id, opcode, lineno, offset
)
[docs]
@_early_return
def track_exception_assertion( # noqa: D102
self, statement: stmt.Statement
) -> None:
assert statement.has_only_exception_assertion()
trace = self._thread_local_state.trace
error_call_position = len(trace.executed_instructions) - 1
trace.executed_assertions.append(
ExecutedAssertion(
error_call_position,
next(iter(statement.assertions)),
)
)
[docs]
@_early_return
def track_assertion_position(self, assertion: ass.Assertion) -> None: # noqa: D102
exec_instr = self.get_trace().executed_instructions
boolean_jump = len(exec_instr) - 1
for instruction in reversed(exec_instr):
if (
is_true_branch := version.get_branch_type(instruction.opcode)
) is not None and is_true_branch:
break
boolean_jump -= 1
assert boolean_jump != -1, "Node in code object did not contain a boolean jump instruction"
self._thread_local_state.trace.executed_assertions.append(
ExecutedAssertion(
boolean_jump,
assertion,
)
)
[docs]
@staticmethod
def attribute_lookup(object_type, attribute: str) -> int:
"""Check the dictionary of classes making up the MRO (method resolution order).
It is inspired by the `_PyType_Lookup` C function in CPython.
Args:
object_type: The type object to check
attribute: the attribute to check for in the class. It must be a data descriptor
to be prioritized here.
Returns:
The id of the object type or the class if it has the attribute, -1 otherwise
"""
for clss in type(object_type).__mro__:
if attribute in clss.__dict__ and inspect.isdatadescriptor(
clss.__dict__.get(attribute)
):
# Class in the MRO hierarchy has attribute
# Class has attribute and attribute is a data descriptor
return id(clss)
# This would lead to an infinite recursion and thus a crash of the program
if attribute in {"__getattr__", "__getitem__"}:
return -1
# Check if the dictionary of the object on which lookup is performed
if (
hasattr(object_type, "__dict__")
and object_type.__dict__
and attribute in object_type.__dict__
):
return id(object_type)
if (
hasattr(object_type, "__slots__")
and object_type.__slots__
and attribute in object_type.__slots__
):
return id(object_type)
# Check if attribute in MRO hierarchy (no need for data descriptor)
for clss in type(object_type).__mro__:
if attribute in clss.__dict__:
return id(clss)
return -1
def __repr__(self) -> str:
return "ExecutionTracer"
def __getstate__(self) -> dict:
return self.state
def __setstate__(self, state: dict) -> None:
self.state = state
[docs]
class InstrumentationExecutionTracer(AbstractExecutionTracer): # noqa: PLR0904
"""An `InstrumentationExecutionTracer` is a sort of proxy for an `ExecutionTracer`.
This was done because when a module is instrumented, instructions are inserted into
its bytecode and refer directly to a tracer. This means that without the use of a
proxy, it would be impossible to modify the tracer, as there are direct references
between the bytecode instructions and the tracer. By adding a proxy between
the bytecode and the tracer, this ensures that the bytecode only has direct
references to the proxy but no references to the tracer, so the tracer can be
modified without any problems.
"""
def __init__(self, tracer: ExecutionTracer): # noqa: D107
self._tracer = tracer
@property
def tracer(self) -> ExecutionTracer: # noqa: D102
return self._tracer
@tracer.setter
def tracer(self, tracer: ExecutionTracer) -> None:
self._tracer = tracer
def __enter__(self) -> Self:
self._tracer.__enter__()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self._tracer.__exit__(exc_type, exc_value, traceback)
[docs]
def check(self) -> None: # noqa: D102
self._tracer.check()
@property
def import_trace(self) -> ExecutionTrace: # noqa: D102
return self._tracer.import_trace
[docs]
def reset(self) -> None: # noqa: D102
self._tracer.reset()
[docs]
def store_import_trace(self) -> None: # noqa: D102
self._tracer.store_import_trace()
[docs]
def init_trace(self) -> None: # noqa: D102
self._tracer.init_trace()
[docs]
def is_disabled(self) -> bool: # noqa: D102
return self._tracer.is_disabled()
[docs]
def enable(self) -> None: # noqa: D102
self._tracer.enable()
[docs]
def disable(self) -> None: # noqa: D102
self._tracer.disable()
[docs]
def stop(self) -> None: # noqa: D102
self._tracer.stop()
[docs]
def get_trace(self) -> ExecutionTrace: # noqa: D102
return self._tracer.get_trace()
[docs]
def executed_code_object(self, code_object_id: int) -> None: # noqa: D102
self._tracer.executed_code_object(code_object_id)
[docs]
def executed_compare_predicate( # noqa: D102
self, value1, value2, predicate: int, cmp_op: PynguinCompare
) -> None:
self._tracer.executed_compare_predicate(value1, value2, predicate, cmp_op)
[docs]
def executed_bool_predicate(self, value, predicate: int) -> None: # noqa: D102
self._tracer.executed_bool_predicate(value, predicate)
[docs]
def executed_in_presence_predicate( # noqa: D102
self, value1, value2, predicate: int
) -> None:
self._tracer.executed_in_presence_predicate(
value1=value1, value2=value2, predicate=predicate
)
[docs]
def executed_exception_match( # noqa: D102
self,
err: BaseException | type[BaseException],
exc: type[BaseException],
predicate: int,
) -> None:
self._tracer.executed_exception_match(err, exc, predicate)
[docs]
def track_line_visit(self, line_id: int) -> None: # noqa: D102
self._tracer.track_line_visit(line_id)
[docs]
def track_generic( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
self._tracer.track_generic(module, code_object_id, node_id, opcode, lineno, offset)
[docs]
def track_memory_access( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
var_name: VariableName | tuple[VariableName, VariableName],
var_value: object,
) -> None:
self._tracer.track_memory_access(
module,
code_object_id,
node_id,
opcode,
lineno,
offset,
var_name,
var_value,
)
[docs]
def track_attribute_access( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
attr_name: str | None,
obj: object,
) -> None:
self._tracer.track_attribute_access(
module,
code_object_id,
node_id,
opcode,
lineno,
offset,
attr_name,
obj,
)
[docs]
def track_jump( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
target_id: int,
) -> None:
self._tracer.track_jump(module, code_object_id, node_id, opcode, lineno, offset, target_id)
[docs]
def track_call( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
arg: int,
) -> None:
self._tracer.track_call(module, code_object_id, node_id, opcode, lineno, offset, arg)
[docs]
def track_return( # noqa: PLR0917, D102
self,
module: str,
code_object_id: int,
node_id: int,
opcode: int,
lineno: int,
offset: int,
) -> None:
self._tracer.track_return(module, code_object_id, node_id, opcode, lineno, offset)
[docs]
def track_exception_assertion( # noqa: D102
self, statement: stmt.Statement
) -> None:
self._tracer.track_exception_assertion(statement)
[docs]
def track_assertion_position(self, assertion: ass.Assertion) -> None: # noqa: D102
self._tracer.track_assertion_position(assertion)
def __repr__(self) -> str:
return "InstrumentationExecutionTracer"
def __getstate__(self) -> dict:
return {"tracer": self._tracer}
def __setstate__(self, state: dict) -> None:
self._tracer = state["tracer"]