Source code for pynguin.generator

#  This file is part of Pynguin.
#
#  SPDX-FileCopyrightText: 2019–2026 Pynguin Contributors
#
#  SPDX-License-Identifier: MIT
#
"""Pynguin is an automated unit test generation framework for Python.

The framework generates unit tests for a given Python module.  For this it
supports various approaches, such as a random approach, similar to Randoop or a
whole-suite approach, based on a genetic algorithm, as implemented in EvoSuite.  The
framework allows to export test suites in various styles, i.e., using the `unittest`
library from the Python standard library or tests in the style used by the PyTest
framework.

Pynguin is supposed to be used as a standalone command-line application but it
can also be used as a library by instantiating this class directly.
"""

from __future__ import annotations

import datetime
import enum
import importlib
import inspect
import json
import logging
import math
import sys
from pathlib import Path
from typing import TYPE_CHECKING, cast

try:
    import random

    from faker import Faker

    FANDANGO_FAKER_AVAILABLE = True
except ImportError:
    FANDANGO_FAKER_AVAILABLE = False

import pynguin.assertion.assertiongenerator as ag
import pynguin.assertion.llmassertiongenerator as lag
import pynguin.assertion.mutation_analysis.mutators as mu
import pynguin.assertion.mutation_analysis.operators as mo
import pynguin.assertion.mutation_analysis.strategies as ms
import pynguin.configuration as config
import pynguin.ga.chromosome as chrom
import pynguin.ga.chromosomevisitor as cv
import pynguin.ga.computations as ff
import pynguin.ga.generationalgorithmfactory as gaf
import pynguin.ga.postprocess as pp
import pynguin.ga.testsuitechromosome as tsc
import pynguin.utils.statistics.stats as stat

if config.configuration.pynguinml.ml_testing_enabled or TYPE_CHECKING:
    import pynguin.utils.pynguinml.ml_testing_resources as tr

from pynguin.analyses.constants import (
    ConstantProvider,
    DelegatingConstantProvider,
    DynamicConstantProvider,
    EmptyConstantProvider,
    RestrictedConstantPool,
    collect_static_constants,
)
from pynguin.analyses.module import generate_test_cluster
from pynguin.assertion.mutation_analysis.controller import MutationController
from pynguin.assertion.mutation_analysis.transformer import ParentNodeTransformer
from pynguin.instrumentation.machinery import InstrumentationFinder, install_import_hook
from pynguin.instrumentation.tracer import SubjectProperties
from pynguin.slicer.statementslicingobserver import RemoteStatementSlicingObserver
from pynguin.testcase import export
from pynguin.testcase.execution import (
    RemoteAssertionExecutionObserver,
    SubprocessTestCaseExecutor,
    TestCaseExecutor,
)
from pynguin.utils import randomness
from pynguin.utils.exceptions import ConfigurationException, CoroutineFoundException
from pynguin.utils.llm import LLM, LLMProvider, extract_code
from pynguin.utils.report import (
    get_coverage_report,
    render_coverage_report,
    render_xml_coverage_report,
)
from pynguin.utils.statistics.runtimevariable import RuntimeVariable

if TYPE_CHECKING:
    from collections.abc import Callable

    from pynguin.analyses.module import ModuleTestCluster
    from pynguin.assertion.mutation_analysis.operators.base import MutationOperator
    from pynguin.ga.algorithms.generationalgorithm import GenerationAlgorithm

if config.configuration.pynguinml.ml_testing_enabled or TYPE_CHECKING:
    from pynguin.utils.pynguinml import np_rng


[docs] @enum.unique class ReturnCode(enum.IntEnum): """Return codes for Pynguin to signal result.""" OK = 0 """Symbolises that the execution ended as expected.""" SETUP_FAILED = 1 """Symbolises that the execution failed in the setup phase.""" NO_TESTS_GENERATED = 2 """Symbolises that no test could be generated.""" FINAL_METRICS_TRACKING_FAILED = 3 """Symbolises that the final metrics tracking failed."""
_LOGGER = logging.getLogger(__name__)
[docs] def set_configuration(configuration: config.Configuration) -> None: """Initialises the test generator with the given configuration. Args: configuration: The configuration to use. """ config.configuration = configuration
[docs] def run_pynguin() -> ReturnCode: """Run the test generation. The result of the test generation is indicated by the resulting ReturnCode. Returns: See ReturnCode. Raises: ConfigurationException: In case the configuration is illegal """ try: _LOGGER.info("Start Pynguin Test Generation…") if config.configuration.algorithm == config.Algorithm.LLM: return _run_llm() return _run() finally: _LOGGER.info("Stop Pynguin Test Generation…")
def _setup_test_cluster() -> ModuleTestCluster | None: try: test_cluster = generate_test_cluster( config.configuration.module_name, config.configuration.type_inference.type_inference_strategy, ) except ModuleNotFoundError as ex: _LOGGER.exception( """Module %s could not be found. This is likely due to a missing dependency. It may also be caused by a bug in the SUT, especially if it uses C-modules. """, ex.name, ) return None except CoroutineFoundException as ex: _LOGGER.exception( "Pynguin does not support test generation for coroutines (async def): %s", ex ) return None if test_cluster.num_accessible_objects_under_test() == 0: _LOGGER.error("SUT contains nothing we can test.") return None return test_cluster def _setup_path() -> bool: """Set up the run-time path. Inserts the path to the SUT into the path list, installs the import hook and tries to load the SUT. Returns: An optional execution tracer, if loading was successful, None otherwise. """ if not Path(config.configuration.project_path).is_dir(): _LOGGER.error("%s is not a valid project path", config.configuration.project_path) return False _LOGGER.debug("Setting up path for %s", config.configuration.project_path) sys.path.insert(0, config.configuration.project_path) return True def _setup_import_hook( dynamic_constant_provider: DynamicConstantProvider | None, ) -> SubjectProperties: _LOGGER.debug("Setting up instrumentation for %s", config.configuration.module_name) subject_properties = SubjectProperties() install_import_hook( config.configuration.module_name, subject_properties, dynamic_constant_provider=dynamic_constant_provider, ) return subject_properties def _load_sut(subject_properties: SubjectProperties) -> bool: module_name = config.configuration.module_name try: # We need to activate the tracer so the import trace is recorded. with subject_properties.instrumentation_tracer: # If the module is already imported, we need to reload it for the # ExecutionTracer to successfully register the subject_properties if module_name in sys.modules: importlib.reload(sys.modules[module_name]) else: importlib.import_module(module_name) except Exception as ex: # A module could not be imported because some dependencies # are missing or it is malformed or any error is raised during the import _LOGGER.exception("Failed to load SUT: %s", ex) return False return True def _setup_report_dir() -> bool: # Report dir only needs to be created when statistics or coverage report is enabled. if ( config.configuration.statistics_output.statistics_backend != config.StatisticsBackend.NONE or config.configuration.statistics_output.create_coverage_report ): report_dir = Path(config.configuration.statistics_output.report_dir).absolute() try: report_dir.mkdir(parents=True, exist_ok=True) except (OSError, FileNotFoundError): _LOGGER.exception( "Cannot create report dir %s", config.configuration.statistics_output.report_dir, ) return False return True def _setup_random_number_generator() -> None: """Setup RNG.""" _LOGGER.info("Using seed %d", config.configuration.seeding.seed) randomness.RNG.seed(config.configuration.seeding.seed) if config.configuration.pynguinml.ml_testing_enabled: np_rng.init_rng(config.configuration.seeding.seed) if FANDANGO_FAKER_AVAILABLE: # Seed Fandango random.seed(config.configuration.seeding.seed) # Seed Faker Faker.seed(config.configuration.seeding.seed) def _setup_constant_seeding() -> tuple[ConstantProvider, DynamicConstantProvider | None]: """Collect constants from SUT, if enabled.""" # Use empty provider by default. wrapped_provider: ConstantProvider = EmptyConstantProvider() # We need to return the provider used for dynamic values separately, # because it is later on used to hook up the instrumentation calls. dynamic_constant_provider: DynamicConstantProvider | None = None if config.configuration.seeding.constant_seeding: _LOGGER.info("Collecting static constants from module under test") module_names = ( [config.configuration.module_name] if getattr(config.configuration, "module_name", None) else None ) constant_pool = collect_static_constants( config.configuration.project_path, module_names=module_names ) if len(constant_pool) == 0: _LOGGER.info("No constants found") else: _LOGGER.info("Constants found: %s", len(constant_pool)) # Probability of 1.0 -> if a value is requested and available -> return it. wrapped_provider = DelegatingConstantProvider(constant_pool, wrapped_provider, 1.0) if config.configuration.seeding.dynamic_constant_seeding: _LOGGER.info("Setting up runtime collection of constants") dynamic_constant_provider = DynamicConstantProvider( RestrictedConstantPool(max_size=config.configuration.seeding.max_dynamic_pool_size), wrapped_provider, config.configuration.seeding.seeded_dynamic_values_reuse_probability, config.configuration.seeding.max_dynamic_length, ) wrapped_provider = dynamic_constant_provider return wrapped_provider, dynamic_constant_provider def _setup_ml_testing_environment(test_cluster: ModuleTestCluster): # load resources once so they get cached tr.get_datatype_mapping() tr.get_nparray_function(test_cluster) tr.get_constructor_function(test_cluster) def _verify_config() -> None: """Verify the configuration and raise an exception if something is invalid/not supported.""" coverage_metrics = config.configuration.statistics_output.coverage_metrics if config.configuration.algorithm is config.configuration.algorithm.DYNAMOSA and any( m for m in coverage_metrics if m is not config.CoverageMetric.BRANCH ): raise ConfigurationException( "DynaMosa currently only supports branch coverage as coverage criterion." ) def _setup_and_check() -> tuple[TestCaseExecutor, ModuleTestCluster, ConstantProvider] | None: """Load the System Under Test (SUT) i.e. the module that is tested. Perform setup and some sanity checks. Returns: An optional tuple of test-case executor and test cluster """ if not _setup_path(): return None wrapped_constant_provider, dynamic_constant_provider = _setup_constant_seeding() subject_properties = _setup_import_hook(dynamic_constant_provider) if not _load_sut(subject_properties): return None if not _setup_report_dir(): return None # Analyzing the SUT should not cause any coverage. with subject_properties.instrumentation_tracer.temporarily_disable(): if (test_cluster := _setup_test_cluster()) is None: return None # Make alias to make the following lines shorter... stop = config.configuration.stopping if config.configuration.subprocess: executor: TestCaseExecutor = SubprocessTestCaseExecutor( subject_properties=subject_properties, maximum_test_execution_timeout=stop.maximum_test_execution_timeout, test_execution_time_per_statement=stop.test_execution_time_per_statement, ) else: executor = TestCaseExecutor( subject_properties=subject_properties, maximum_test_execution_timeout=stop.maximum_test_execution_timeout, test_execution_time_per_statement=stop.test_execution_time_per_statement, ) _track_sut_data(subject_properties, test_cluster) _setup_random_number_generator() if config.configuration.pynguinml.ml_testing_enabled: _setup_ml_testing_environment(test_cluster) # Detect which LLM strategy is used stat.track_output_variable(RuntimeVariable.LLMStrategy, _detect_llm_strategy()) return executor, test_cluster, wrapped_constant_provider def _detect_llm_strategy() -> str: if config.configuration.large_language_model.hybrid_initial_population: return ( f"Hybrid-Initial-Population-" f"{config.configuration.large_language_model.llm_test_case_percentage}" ) if config.configuration.large_language_model.call_llm_on_stall_detection: return "LLM-On-Stall-Detection" if config.configuration.large_language_model.call_llm_for_uncovered_targets: return "LLM-For-Initial-Uncovered-Targets" if config.configuration.test_case_output.assertion_generation == config.AssertionGenerator.LLM: return "LLM-Assertion-Generator" return "" def _track_sut_data(subject_properties: SubjectProperties, test_cluster: ModuleTestCluster) -> None: """Track data from the SUT. Args: subject_properties: The properties of the subject under test. test_cluster: the test cluster """ stat.track_output_variable( RuntimeVariable.CodeObjects, len(subject_properties.existing_code_objects), ) stat.track_output_variable( RuntimeVariable.Predicates, len(subject_properties.existing_predicates), ) stat.track_output_variable( RuntimeVariable.Lines, len(subject_properties.existing_lines), ) cyclomatic_complexities: list[int] = [ code.cfg.cyclomatic_complexity for code in subject_properties.existing_code_objects.values() ] stat.track_output_variable( RuntimeVariable.McCabeCodeObject, json.dumps(cyclomatic_complexities) ) test_cluster.track_statistics_values(stat.track_output_variable) if config.CoverageMetric.BRANCH in config.configuration.statistics_output.coverage_metrics: stat.track_output_variable( RuntimeVariable.ImportBranchCoverage, ff.compute_branch_coverage( subject_properties.instrumentation_tracer.import_trace, subject_properties, ), ) if config.CoverageMetric.LINE in config.configuration.statistics_output.coverage_metrics: stat.track_output_variable( RuntimeVariable.ImportLineCoverage, ff.compute_line_coverage( subject_properties.instrumentation_tracer.import_trace, subject_properties, ), ) def _get_coverage_ff_from_algorithm( algorithm: GenerationAlgorithm, function_type: type[ff.TestSuiteCoverageFunction] ) -> ff.TestSuiteCoverageFunction: """Retrieve the coverage function for a test suite of a given coverage type. Args: algorithm: The test generation strategy function_type: the type of coverage function to receive Returns: The coverage function for a test suite for this run of the given type """ test_suite_coverage_func = None for coverage_func in algorithm.test_suite_coverage_functions: if isinstance(coverage_func, function_type): test_suite_coverage_func = coverage_func assert test_suite_coverage_func, "The required coverage function was not initialised" return test_suite_coverage_func def _reload_instrumentation_loader( coverage_metrics: set[config.CoverageMetric], dynamic_constant_provider: DynamicConstantProvider | None, subject_properties: SubjectProperties, ): module_name = config.configuration.module_name module = importlib.import_module(module_name) first_finder: InstrumentationFinder | None = None for finder in sys.meta_path: if isinstance(finder, InstrumentationFinder): first_finder = finder break assert first_finder is not None first_finder.update_instrumentation_metrics( subject_properties=subject_properties, coverage_metrics=coverage_metrics, dynamic_constant_provider=dynamic_constant_provider, ) try: with subject_properties.instrumentation_tracer: importlib.reload(module) except Exception as ex: _LOGGER.exception("Failed to reload SUT: %s", ex) return False return True def _reset_cache_for_result(generation_result): generation_result.invalidate_cache() for test_case in generation_result.test_case_chromosomes: test_case.invalidate_cache() test_case.remove_last_execution_result() def _track_final_metrics( algorithm, executor: TestCaseExecutor, generation_result: tsc.TestSuiteChromosome, constant_provider: ConstantProvider, ) -> set[config.CoverageMetric] | None: """Track the final coverage metrics. Re-loads all required instrumentations for metrics that were not already calculated and tracked during the result generation. These metrics are then also calculated on the result, which is executed once again with the new instrumentation. Args: algorithm: the used test-generation algorithm executor: the testcase executor of the run generation_result: the generated testsuite containing assertions constant_provider: the constant provider required for the reloading of the module Returns: The set of tracked coverage metrics, including the ones that we optimised for or None if the tracking failed. """ output_variables = config.configuration.statistics_output.output_variables # Alias for shorter lines cov_metrics = config.configuration.statistics_output.coverage_metrics metrics_for_reinstrumenation: set[config.CoverageMetric] = set(cov_metrics) to_calculate: list[tuple[RuntimeVariable, ff.TestSuiteCoverageFunction]] = [] add_additional_metrics( algorithm=algorithm, cov_metrics=cov_metrics, executor=executor, metrics_for_reinstrumentation=metrics_for_reinstrumenation, output_variables=output_variables, to_calculate=to_calculate, ) # Assertion Checked Coverage is special... if RuntimeVariable.AssertionCheckedCoverage in output_variables: metrics_for_reinstrumenation.add(config.CoverageMetric.CHECKED) executor.set_instrument(True) executor.add_remote_observer(RemoteAssertionExecutionObserver()) assertion_checked_coverage_ff = ff.TestSuiteAssertionCheckedCoverageFunction(executor) to_calculate.append(( RuntimeVariable.AssertionCheckedCoverage, assertion_checked_coverage_ff, )) # re-instrument the files dynamic_constant_provider = None if isinstance(constant_provider, DynamicConstantProvider): dynamic_constant_provider = constant_provider if not _reload_instrumentation_loader( metrics_for_reinstrumenation, dynamic_constant_provider, executor.subject_properties, ): return None # force new execution of the test cases after new instrumentation _reset_cache_for_result(generation_result) # set value for each newly calculated variable for runtime_variable, coverage_ff in to_calculate: generation_result.add_coverage_function(coverage_ff) _LOGGER.info(f"Calculating resulting {runtime_variable.value}") # noqa: G004 stat.track_output_variable( runtime_variable, generation_result.get_coverage_for(coverage_ff) ) ass_gen = config.configuration.test_case_output.assertion_generation if ( ass_gen == config.AssertionGenerator.CHECKED_MINIMIZING and RuntimeVariable.AssertionCheckedCoverage in output_variables ): _minimize_assertions(generation_result) # Collect other final stats on result stat.track_output_variable(RuntimeVariable.FinalLength, generation_result.length()) stat.track_output_variable(RuntimeVariable.FinalSize, generation_result.size()) # reset whether to instrument tests and assertions as well as the SUT instrument_test = config.CoverageMetric.CHECKED in cov_metrics executor.set_instrument(instrument_test) return metrics_for_reinstrumenation def add_additional_metrics( # noqa: D103 *, algorithm, cov_metrics, executor, metrics_for_reinstrumentation, output_variables, to_calculate, ): if ( RuntimeVariable.FinalLineCoverage in output_variables and config.CoverageMetric.LINE not in cov_metrics ): metrics_for_reinstrumentation.add(config.CoverageMetric.LINE) line_cov_ff = ff.TestSuiteLineCoverageFunction(executor) to_calculate.append((RuntimeVariable.FinalLineCoverage, line_cov_ff)) elif config.CoverageMetric.LINE in cov_metrics: # If we optimised for lines, we still want to get the final line coverage. to_calculate.append(( RuntimeVariable.FinalLineCoverage, _get_coverage_ff_from_algorithm(algorithm, ff.TestSuiteLineCoverageFunction), )) if ( RuntimeVariable.FinalBranchCoverage in output_variables and config.CoverageMetric.BRANCH not in cov_metrics ): metrics_for_reinstrumentation.add(config.CoverageMetric.BRANCH) branch_cov_ff = ff.TestSuiteBranchCoverageFunction(executor) to_calculate.append((RuntimeVariable.FinalBranchCoverage, branch_cov_ff)) elif config.CoverageMetric.BRANCH in cov_metrics: # If we optimised for branches, we still want to get the final branch coverage. to_calculate.append(( RuntimeVariable.FinalBranchCoverage, _get_coverage_ff_from_algorithm(algorithm, ff.TestSuiteBranchCoverageFunction), )) def _run() -> ReturnCode: # noqa: C901 _verify_config() if (setup_result := _setup_and_check()) is None: return ReturnCode.SETUP_FAILED executor, test_cluster, constant_provider = setup_result # traces slices for test cases after execution coverage_metrics = config.configuration.statistics_output.coverage_metrics if config.CoverageMetric.CHECKED in coverage_metrics: executor.add_remote_observer(RemoteStatementSlicingObserver()) algorithm: GenerationAlgorithm = _instantiate_test_generation_strategy( executor, test_cluster, constant_provider ) _LOGGER.info("Start generating test cases") generation_result = algorithm.generate_tests() if algorithm.resources_left(): _LOGGER.info("Algorithm stopped before using all resources.") else: _LOGGER.info("Stopping condition reached") for stop in algorithm.stopping_conditions: _LOGGER.info("%s", stop) _LOGGER.info("Stop generating test cases") # Executions that happen after this point should not influence the # search statistics executor.clear_observers() executor.clear_remote_observers() _track_search_metrics(algorithm, generation_result, coverage_metrics) try: _LOGGER.info("Minimizing test cases") _minimize(generation_result, algorithm) except Exception as ex: _LOGGER.exception("Minimization failed: %s", ex) _generate_assertions(executor, generation_result, test_cluster) if ( tracked_metrics := _track_final_metrics( algorithm, executor, generation_result, constant_provider, ) ) is None: return ReturnCode.FINAL_METRICS_TRACKING_FAILED executor.subject_properties.instrumentation_tracer.disable() # Export the generated test suites if config.configuration.test_case_output.export_strategy == config.ExportStrategy.PY_TEST: try: _export_chromosome(generation_result) except Exception as ex: _LOGGER.exception("Export to PyTest failed: %s", ex) if config.configuration.statistics_output.create_coverage_report: try: coverage_report = get_coverage_report( generation_result, executor.subject_properties, tracked_metrics, ) render_coverage_report( coverage_report, Path(config.configuration.statistics_output.report_dir) / "cov_report.html", datetime.datetime.now(), # noqa: DTZ005 ) render_xml_coverage_report( coverage_report, Path(config.configuration.statistics_output.report_dir) / "cov_report.xml", datetime.datetime.now(), # noqa: DTZ005 ) except Exception as e: # noqa: BLE001 _LOGGER.warning( "Failed to create coverage report: %s. ", e, ) _collect_miscellaneous_statistics(test_cluster) if not stat.write_statistics(): _LOGGER.error("Failed to write statistics data") if generation_result.size() == 0: # not able to generate one test case return ReturnCode.NO_TESTS_GENERATED return ReturnCode.OK def _run_llm() -> ReturnCode: def load_sut_code() -> str: project_path = Path(config.configuration.project_path) module_name = config.configuration.module_name.replace(".", "/") + ".py" sut_file = project_path / module_name return sut_file.read_text() model = LLM.create(LLMProvider.OPENAI) user_prompt = "Generate test cases for the following Python code:\n\n" user_prompt += "```\n" user_prompt += load_sut_code() user_prompt += "\n```\n" response = model.chat(user_prompt) if not response: return ReturnCode.NO_TESTS_GENERATED code = extract_code(response) module_name = config.configuration.module_name.replace(".", "_") target_file = ( Path(config.configuration.test_case_output.output_path).resolve() / f"test_{module_name}.py" ) target_file.write_text(code) return ReturnCode.OK def _check_coverage(original_coverages: list[float], minimized_coverages: list[float]) -> bool: """Check if the coverage after minimization is the same as before. Args: original_coverages: The coverages before minimization minimized_coverages: The coverages after minimization Returns: If the coverage is still the same """ is_same = all(map(math.isclose, original_coverages, minimized_coverages)) if is_same: _LOGGER.info("Coverage after minimization is the same as before: %s", minimized_coverages) else: _LOGGER.warning( "Coverage after minimization changed from %s to %s", original_coverages, minimized_coverages, ) return is_same def _minimize(generation_result, algorithm=None): truncation = pp.ExceptionTruncation() generation_result.accept(truncation) if config.configuration.test_case_output.post_process: unused_vars_minimizer = pp.UnusedStatementsTestCaseVisitor() minimization_strategy = ( config.configuration.test_case_output.minimization.test_case_minimization_strategy ) if minimization_strategy != config.MinimizationStrategy.NONE and algorithm is not None: fitness_functions = algorithm.test_suite_coverage_functions assert len(fitness_functions) > 0, "No test suite coverage functions available" original_coverages = [ generation_result.get_coverage_for(fitness_function) for fitness_function in fitness_functions ] # Save a copy of the original test suite before minimization original_test_suite = generation_result.clone() # Select the appropriate minimization visitor based on the strategy if ( config.configuration.test_case_output.minimization.test_case_minimization_direction == config.MinimizationDirection.FORWARD ): iterative_minimizer: pp.IterativeMinimizationVisitor = ( pp.ForwardIterativeMinimizationVisitor(fitness_functions) ) else: iterative_minimizer = pp.BackwardIterativeMinimizationVisitor(fitness_functions) # Check if we should use the combined minimization approach if ( config.configuration.test_case_output.minimization.test_case_minimization_strategy == config.MinimizationStrategy.COMBINED ): combined_minimizer = pp.CombinedMinimizationVisitor(fitness_functions) generation_result.accept(combined_minimizer) _LOGGER.info( "Combined minimization removed %d statement(s)", combined_minimizer.removed_statements, ) else: # Apply traditional test case minimization strategies test_case_minimizer = pp.TestCasePostProcessor([ unused_vars_minimizer, iterative_minimizer, ]) generation_result.accept(test_case_minimizer) _LOGGER.info( "Removed %d statement(s) from test casesusing %s minimization", iterative_minimizer.removed_statements, minimization_strategy.value, ) # Apply test suite minimization to remove redundant test cases if ( config.configuration.test_case_output.minimization.test_case_minimization_strategy == config.MinimizationStrategy.SUITE ): test_suite_minimizer = pp.TestSuiteMinimizationVisitor(fitness_functions) generation_result.accept(test_suite_minimizer) if test_suite_minimizer.removed_test_cases > 0: _LOGGER.info( "Removed %d test case(s) from test suite during minimization", test_suite_minimizer.removed_test_cases, ) minimized_coverages = [ generation_result.get_coverage_for(fitness_function) for fitness_function in fitness_functions ] is_same = _check_coverage(original_coverages, minimized_coverages) if not is_same: # Restore unminimized test suite _LOGGER.info("Restoring unminimized test suite due to coverage loss") # Replace the current test suite with the original one generation_result.test_case_chromosomes = [ test.clone() for test in original_test_suite.test_case_chromosomes ] # Mark the test suite as changed generation_result.changed = True # Verify that coverage is restored restored_coverage = generation_result.get_coverage_for(fitness_functions) _LOGGER.info("Coverage after restoration: %.4f", restored_coverage) else: unused_primitives_removal = pp.TestCasePostProcessor([unused_vars_minimizer]) generation_result.accept(unused_primitives_removal) # Remove empty test cases after minimization empty_test_case_remover = pp.EmptyTestCaseRemover() generation_result.accept(empty_test_case_remover) def _minimize_assertions(generation_result: tsc.TestSuiteChromosome): _LOGGER.info("Minimizing assertions based on checked coverage") assertion_minimizer = pp.AssertionMinimization() generation_result.accept(assertion_minimizer) stat.track_output_variable( RuntimeVariable.Assertions, len(assertion_minimizer.remaining_assertions) ) stat.track_output_variable( RuntimeVariable.DeletedAssertions, len(assertion_minimizer.deleted_assertions), ) _strategies: dict[config.MutationStrategy, Callable[[int], ms.HOMStrategy]] = { config.MutationStrategy.FIRST_TO_LAST: ms.FirstToLastHOMStrategy, config.MutationStrategy.BETWEEN_OPERATORS: ms.BetweenOperatorsHOMStrategy, config.MutationStrategy.RANDOM: ms.RandomHOMStrategy, config.MutationStrategy.EACH_CHOICE: ms.EachChoiceHOMStrategy, } def _setup_mutant_generator() -> mu.Mutator: operators: list[type[MutationOperator]] = [ *mo.standard_operators, *mo.experimental_operators, ] mutation_strategy = config.configuration.test_case_output.mutation_strategy if mutation_strategy == config.MutationStrategy.FIRST_ORDER_MUTANTS: return mu.FirstOrderMutator(operators) order = config.configuration.test_case_output.mutation_order if order <= 0: raise ConfigurationException("Mutation order should be > 0.") if mutation_strategy in _strategies: hom_strategy = _strategies[mutation_strategy](order) return mu.HighOrderMutator(operators, hom_strategy=hom_strategy) raise ConfigurationException("No suitable mutation strategy found.") def _setup_mutation_analysis_assertion_generator( executor: TestCaseExecutor, ) -> ag.MutationAnalysisAssertionGenerator: _LOGGER.info("Setup mutation generator") mutant_generator = _setup_mutant_generator() _LOGGER.info("Import module %s", config.configuration.module_name) module = importlib.import_module(config.configuration.module_name) _LOGGER.info("Build AST for %s", module.__name__) module_source_code = inspect.getsource(module) module_ast = ParentNodeTransformer.create_ast(module_source_code) _LOGGER.info("Mutate module %s", module.__name__) mutation_controller = MutationController(mutant_generator, module_ast, module) assertion_generator: ag.MutationAnalysisAssertionGenerator if config.configuration.test_case_output.assertion_generation is config.AssertionGenerator.LLM: assertion_generator = lag.MutationAnalysisLLMAssertionGenerator( executor, mutation_controller ) else: assertion_generator = ag.MutationAnalysisAssertionGenerator(executor, mutation_controller) _LOGGER.info("Generated %d mutants", mutation_controller.mutant_count()) return assertion_generator def _generate_assertions(executor, generation_result, test_cluster): ass_gen = config.configuration.test_case_output.assertion_generation if ass_gen != config.AssertionGenerator.NONE: _LOGGER.info("Start generating assertions") generator: cv.ChromosomeVisitor if ass_gen == config.AssertionGenerator.LLM: generation_result.accept(lag.LLMAssertionGenerator(test_cluster)) generator = _setup_mutation_analysis_assertion_generator(executor) elif ass_gen == config.AssertionGenerator.MUTATION_ANALYSIS: generator = _setup_mutation_analysis_assertion_generator(executor) else: generator = ag.AssertionGenerator(executor) generation_result.accept(generator) def _track_search_metrics( algorithm: GenerationAlgorithm, generation_result: tsc.TestSuiteChromosome, coverage_metrics: list[config.CoverageMetric], ) -> None: """Track multiple set coverage metrics of the generated test suites. This possibly re-executes the test suites. Args: algorithm: The test generation strategy generation_result: The resulting chromosome of the generation strategy coverage_metrics: The selected coverage metrics to guide the search """ for metric, runtime, fitness_type in [ ( config.CoverageMetric.LINE, RuntimeVariable.LineCoverage, ff.TestSuiteLineCoverageFunction, ), ( config.CoverageMetric.BRANCH, RuntimeVariable.BranchCoverage, ff.TestSuiteBranchCoverageFunction, ), ( config.CoverageMetric.CHECKED, RuntimeVariable.StatementCheckedCoverage, ff.TestSuiteStatementCheckedCoverageFunction, ), ]: if metric in coverage_metrics: coverage_function: ff.TestSuiteCoverageFunction = _get_coverage_ff_from_algorithm( algorithm, cast("type", fitness_type) ) stat.track_output_variable( runtime, generation_result.get_coverage_for(coverage_function) ) # Write overall coverage data of result stat.current_individual(generation_result) def _instantiate_test_generation_strategy( executor: TestCaseExecutor, test_cluster: ModuleTestCluster, constant_provider: ConstantProvider, ) -> GenerationAlgorithm: factory = gaf.TestSuiteGenerationAlgorithmFactory(executor, test_cluster, constant_provider) return factory.get_search_algorithm() def _collect_miscellaneous_statistics(test_cluster: ModuleTestCluster) -> None: test_cluster.log_cluster_statistics() stat.track_output_variable(RuntimeVariable.TargetModule, config.configuration.module_name) stat.track_output_variable(RuntimeVariable.RandomSeed, randomness.RNG.get_seed()) stat.track_output_variable( RuntimeVariable.ConfigurationId, config.configuration.statistics_output.configuration_id, ) stat.track_output_variable(RuntimeVariable.RunId, config.configuration.statistics_output.run_id) stat.track_output_variable( RuntimeVariable.ProjectName, config.configuration.statistics_output.project_name ) for runtime_variable, value in stat.variables_generator: stat.set_output_variable_for_runtime_variable(runtime_variable, value) def _export_chromosome( chromosome: chrom.Chromosome, file_name_suffix: str = "", ) -> None: """Export the given chromosome. Args: chromosome: the chromosome to export. file_name_suffix: Suffix that can be added to the file name to distinguish between different results e.g., failing and succeeding test cases. Returns: The name of the target file """ module_name = config.configuration.module_name.replace(".", "_") target_file = ( Path(config.configuration.test_case_output.output_path).resolve() / f"test_{module_name}{file_name_suffix}.py" ) store_call_return = ( config.configuration.test_case_output.assertion_generation is config.AssertionGenerator.LLM ) export_visitor = export.PyTestChromosomeToAstVisitor(store_call_return=store_call_return) chromosome.accept(export_visitor) export.save_module_to_file( export_visitor.to_module(), target_file, format_with_black=config.configuration.test_case_output.format_with_black, ) _LOGGER.info("Written %i test cases to %s", chromosome.size(), target_file)