Source code for pynguin.generator

#  This file is part of Pynguin.
#
#  SPDX-FileCopyrightText: 2019–2024 Pynguin Contributors
#
#  SPDX-License-Identifier: MIT
#
"""Pynguin is an automated unit test generation framework for Python.

The framework generates unit tests for a given Python module.  For this it
supports various approaches, such as a random approach, similar to Randoop or a
whole-suite approach, based on a genetic algorithm, as implemented in EvoSuite.  The
framework allows to export test suites in various styles, i.e., using the `unittest`
library from the Python standard library or tests in the style used by the PyTest
framework.

Pynguin is supposed to be used as a standalone command-line application but it
can also be used as a library by instantiating this class directly.
"""
from __future__ import annotations

import datetime
import enum
import importlib
import inspect
import json
import logging
import sys
import threading

from pathlib import Path
from typing import TYPE_CHECKING
from typing import cast

import pynguin.assertion.assertiongenerator as ag
import pynguin.assertion.mutation_analysis.mutators as mu
import pynguin.assertion.mutation_analysis.operators as mo
import pynguin.assertion.mutation_analysis.strategies as ms
import pynguin.configuration as config
import pynguin.ga.chromosome as chrom
import pynguin.ga.chromosomevisitor as cv
import pynguin.ga.computations as ff
import pynguin.ga.generationalgorithmfactory as gaf
import pynguin.ga.postprocess as pp
import pynguin.ga.testsuitechromosome as tsc
import pynguin.utils.statistics.statistics as stat

from pynguin.analyses.constants import ConstantProvider
from pynguin.analyses.constants import DelegatingConstantProvider
from pynguin.analyses.constants import DynamicConstantProvider
from pynguin.analyses.constants import EmptyConstantProvider
from pynguin.analyses.constants import RestrictedConstantPool
from pynguin.analyses.constants import collect_static_constants
from pynguin.analyses.module import generate_test_cluster
from pynguin.assertion.mutation_analysis.transformer import ParentNodeTransformer
from pynguin.instrumentation.machinery import InstrumentationFinder
from pynguin.instrumentation.machinery import install_import_hook
from pynguin.slicer.statementslicingobserver import StatementSlicingObserver
from pynguin.testcase import export
from pynguin.testcase.execution import AssertionExecutionObserver
from pynguin.testcase.execution import ExecutionTracer
from pynguin.testcase.execution import TestCaseExecutor
from pynguin.utils import randomness
from pynguin.utils.exceptions import ConfigurationException
from pynguin.utils.report import get_coverage_report
from pynguin.utils.report import render_coverage_report
from pynguin.utils.report import render_xml_coverage_report
from pynguin.utils.statistics.runtimevariable import RuntimeVariable


if TYPE_CHECKING:
    from collections.abc import Callable

    from pynguin.analyses.module import ModuleTestCluster
    from pynguin.assertion.mutation_analysis.operators.base import MutationOperator
    from pynguin.ga.algorithms.generationalgorithm import GenerationAlgorithm


[docs] @enum.unique class ReturnCode(enum.IntEnum): """Return codes for Pynguin to signal result.""" OK = 0 """Symbolises that the execution ended as expected.""" SETUP_FAILED = 1 """Symbolises that the execution failed in the setup phase.""" NO_TESTS_GENERATED = 2 """Symbolises that no test could be generated."""
_LOGGER = logging.getLogger(__name__)
[docs] def set_configuration(configuration: config.Configuration) -> None: """Initialises the test generator with the given configuration. Args: configuration: The configuration to use. """ config.configuration = configuration
[docs] def run_pynguin() -> ReturnCode: """Run the test generation. The result of the test generation is indicated by the resulting ReturnCode. Returns: See ReturnCode. Raises: ConfigurationException: In case the configuration is illegal """ try: _LOGGER.info("Start Pynguin Test Generation…") return _run() finally: _LOGGER.info("Stop Pynguin Test Generation…")
def _setup_test_cluster() -> ModuleTestCluster | None: test_cluster = generate_test_cluster( config.configuration.module_name, config.configuration.type_inference.type_inference_strategy, ) if test_cluster.num_accessible_objects_under_test() == 0: _LOGGER.error("SUT contains nothing we can test.") return None return test_cluster def _setup_path() -> bool: """Set up the run-time path. Inserts the path to the SUT into the path list, installs the import hook and tries to load the SUT. Returns: An optional execution tracer, if loading was successful, None otherwise. """ if not Path(config.configuration.project_path).is_dir(): _LOGGER.error( "%s is not a valid project path", config.configuration.project_path ) return False _LOGGER.debug("Setting up path for %s", config.configuration.project_path) sys.path.insert(0, config.configuration.project_path) return True def _setup_import_hook( coverage_metrics: set[config.CoverageMetric], dynamic_constant_provider: DynamicConstantProvider | None, ) -> ExecutionTracer: _LOGGER.debug("Setting up instrumentation for %s", config.configuration.module_name) tracer = ExecutionTracer() install_import_hook( config.configuration.module_name, tracer, coverage_metrics=coverage_metrics, dynamic_constant_provider=dynamic_constant_provider, ) return tracer def _load_sut(tracer: ExecutionTracer) -> bool: try: # We need to set the current thread ident so the import trace is recorded. tracer.current_thread_identifier = threading.current_thread().ident importlib.import_module(config.configuration.module_name) except ImportError as ex: # A module could not be imported because some dependencies # are missing or it is malformed _LOGGER.exception("Failed to load SUT: %s", ex) return False return True def _setup_report_dir() -> bool: # Report dir only needs to be created when statistics or coverage report is enabled. if ( config.configuration.statistics_output.statistics_backend != config.StatisticsBackend.NONE or config.configuration.statistics_output.create_coverage_report ): report_dir = Path(config.configuration.statistics_output.report_dir).absolute() try: report_dir.mkdir(parents=True, exist_ok=True) except (OSError, FileNotFoundError): _LOGGER.exception( "Cannot create report dir %s", config.configuration.statistics_output.report_dir, ) return False return True def _setup_random_number_generator() -> None: """Setup RNG.""" _LOGGER.info("Using seed %d", config.configuration.seeding.seed) randomness.RNG.seed(config.configuration.seeding.seed) def _setup_constant_seeding() -> ( tuple[ConstantProvider, DynamicConstantProvider | None] ): """Collect constants from SUT, if enabled.""" # Use empty provider by default. wrapped_provider: ConstantProvider = EmptyConstantProvider() # We need to return the provider used for dynamic values separately, # because it is later on used to hook up the instrumentation calls. dynamic_constant_provider: DynamicConstantProvider | None = None if config.configuration.seeding.constant_seeding: _LOGGER.info("Collecting static constants from module under test") constant_pool = collect_static_constants(config.configuration.project_path) if len(constant_pool) == 0: _LOGGER.info("No constants found") else: _LOGGER.info("Constants found: %s", len(constant_pool)) # Probability of 1.0 -> if a value is requested and available -> return it. wrapped_provider = DelegatingConstantProvider( constant_pool, wrapped_provider, 1.0 ) if config.configuration.seeding.dynamic_constant_seeding: _LOGGER.info("Setting up runtime collection of constants") dynamic_constant_provider = DynamicConstantProvider( RestrictedConstantPool( max_size=config.configuration.seeding.max_dynamic_pool_size ), wrapped_provider, config.configuration.seeding.seeded_dynamic_values_reuse_probability, config.configuration.seeding.max_dynamic_length, ) wrapped_provider = dynamic_constant_provider return wrapped_provider, dynamic_constant_provider def _setup_and_check() -> ( tuple[TestCaseExecutor, ModuleTestCluster, ConstantProvider] | None ): """Load the System Under Test (SUT) i.e. the module that is tested. Perform setup and some sanity checks. Returns: An optional tuple of test-case executor and test cluster """ if not _setup_path(): return None wrapped_constant_provider, dynamic_constant_provider = _setup_constant_seeding() tracer = _setup_import_hook( set(config.configuration.statistics_output.coverage_metrics), dynamic_constant_provider, ) if not _load_sut(tracer): return None if not _setup_report_dir(): return None # Analyzing the SUT should not cause any coverage. tracer.disable() if (test_cluster := _setup_test_cluster()) is None: return None tracer.enable() # Make alias to make the following lines shorter... stop = config.configuration.stopping executor = TestCaseExecutor( tracer, maximum_test_execution_timeout=stop.maximum_test_execution_timeout, test_execution_time_per_statement=stop.test_execution_time_per_statement, ) _track_sut_data(tracer, test_cluster) _setup_random_number_generator() return executor, test_cluster, wrapped_constant_provider def _track_sut_data(tracer: ExecutionTracer, test_cluster: ModuleTestCluster) -> None: """Track data from the SUT. Args: tracer: the execution tracer test_cluster: the test cluster """ stat.track_output_variable( RuntimeVariable.CodeObjects, len(tracer.get_subject_properties().existing_code_objects), ) stat.track_output_variable( RuntimeVariable.Predicates, len(tracer.get_subject_properties().existing_predicates), ) stat.track_output_variable( RuntimeVariable.Lines, len(tracer.get_subject_properties().existing_lines), ) cyclomatic_complexities: list[int] = [ code.original_cfg.cyclomatic_complexity for code in tracer.get_subject_properties().existing_code_objects.values() ] stat.track_output_variable( RuntimeVariable.McCabeCodeObject, json.dumps(cyclomatic_complexities) ) test_cluster.track_statistics_values(stat.track_output_variable) if ( config.CoverageMetric.BRANCH in config.configuration.statistics_output.coverage_metrics ): stat.track_output_variable( RuntimeVariable.ImportBranchCoverage, ff.compute_branch_coverage( tracer.import_trace, tracer.get_subject_properties() ), ) if ( config.CoverageMetric.LINE in config.configuration.statistics_output.coverage_metrics ): stat.track_output_variable( RuntimeVariable.ImportLineCoverage, ff.compute_line_coverage( tracer.import_trace, tracer.get_subject_properties() ), ) def _get_coverage_ff_from_algorithm( algorithm: GenerationAlgorithm, function_type: type[ff.TestSuiteCoverageFunction] ) -> ff.TestSuiteCoverageFunction: """Retrieve the coverage function for a test suite of a given coverage type. Args: algorithm: The test generation strategy function_type: the type of coverage function to receive Returns: The coverage function for a test suite for this run of the given type """ test_suite_coverage_func = None for coverage_func in algorithm.test_suite_coverage_functions: if isinstance(coverage_func, function_type): test_suite_coverage_func = coverage_func assert ( test_suite_coverage_func ), "The required coverage function was not initialised" return test_suite_coverage_func def _reload_instrumentation_loader( coverage_metrics: set[config.CoverageMetric], dynamic_constant_provider: DynamicConstantProvider | None, tracer: ExecutionTracer, ): module_name = config.configuration.module_name module = importlib.import_module(module_name) tracer.current_thread_identifier = threading.current_thread().ident first_finder: InstrumentationFinder | None = None for finder in sys.meta_path: if isinstance(finder, InstrumentationFinder): first_finder = finder break assert first_finder is not None first_finder.update_instrumentation_metrics( tracer=tracer, coverage_metrics=coverage_metrics, dynamic_constant_provider=dynamic_constant_provider, ) importlib.reload(module) def _reset_cache_for_result(generation_result): generation_result.invalidate_cache() for test_case in generation_result.test_case_chromosomes: test_case.invalidate_cache() test_case.remove_last_execution_result() def _track_final_metrics( algorithm, executor: TestCaseExecutor, generation_result: tsc.TestSuiteChromosome, constant_provider: ConstantProvider, ) -> set[config.CoverageMetric]: """Track the final coverage metrics. Re-loads all required instrumentations for metrics that were not already calculated and tracked during the result generation. These metrics are then also calculated on the result, which is executed once again with the new instrumentation. Args: algorithm: the used test-generation algorithm executor: the testcase executor of the run generation_result: the generated testsuite containing assertions constant_provider: the constant provider required for the reloading of the module Returns: The set of tracked coverage metrics, including the ones that we optimised for. """ output_variables = config.configuration.statistics_output.output_variables # Alias for shorter lines cov_metrics = config.configuration.statistics_output.coverage_metrics metrics_for_reinstrumenation: set[config.CoverageMetric] = set(cov_metrics) to_calculate: list[tuple[RuntimeVariable, ff.TestSuiteCoverageFunction]] = [] add_additional_metrics( algorithm=algorithm, cov_metrics=cov_metrics, executor=executor, metrics_for_reinstrumentation=metrics_for_reinstrumenation, output_variables=output_variables, to_calculate=to_calculate, ) # Assertion Checked Coverage is special... if RuntimeVariable.AssertionCheckedCoverage in output_variables: metrics_for_reinstrumenation.add(config.CoverageMetric.CHECKED) executor.set_instrument(True) executor.add_observer(AssertionExecutionObserver(executor.tracer)) assertion_checked_coverage_ff = ff.TestSuiteAssertionCheckedCoverageFunction( executor ) to_calculate.append( (RuntimeVariable.AssertionCheckedCoverage, assertion_checked_coverage_ff) ) # re-instrument the files dynamic_constant_provider = None if isinstance(constant_provider, DynamicConstantProvider): dynamic_constant_provider = constant_provider _reload_instrumentation_loader( metrics_for_reinstrumenation, dynamic_constant_provider, executor.tracer ) # force new execution of the test cases after new instrumentation _reset_cache_for_result(generation_result) # set value for each newly calculated variable for runtime_variable, coverage_ff in to_calculate: generation_result.add_coverage_function(coverage_ff) _LOGGER.info(f"Calculating resulting {runtime_variable.value}") # noqa: G004 stat.track_output_variable( runtime_variable, generation_result.get_coverage_for(coverage_ff) ) ass_gen = config.configuration.test_case_output.assertion_generation if ( ass_gen == config.AssertionGenerator.CHECKED_MINIMIZING and RuntimeVariable.AssertionCheckedCoverage in output_variables ): _minimize_assertions(generation_result) # Collect other final stats on result stat.track_output_variable(RuntimeVariable.FinalLength, generation_result.length()) stat.track_output_variable(RuntimeVariable.FinalSize, generation_result.size()) # reset whether to instrument tests and assertions as well as the SUT instrument_test = config.CoverageMetric.CHECKED in cov_metrics executor.set_instrument(instrument_test) return metrics_for_reinstrumenation def add_additional_metrics( # noqa: D103 *, algorithm, cov_metrics, executor, metrics_for_reinstrumentation, output_variables, to_calculate, ): if ( RuntimeVariable.FinalLineCoverage in output_variables and config.CoverageMetric.LINE not in cov_metrics ): metrics_for_reinstrumentation.add(config.CoverageMetric.LINE) line_cov_ff = ff.TestSuiteLineCoverageFunction(executor) to_calculate.append((RuntimeVariable.FinalLineCoverage, line_cov_ff)) elif config.CoverageMetric.LINE in cov_metrics: # If we optimised for lines, we still want to get the final line coverage. to_calculate.append( ( RuntimeVariable.FinalLineCoverage, _get_coverage_ff_from_algorithm( algorithm, ff.TestSuiteLineCoverageFunction ), ) ) if ( RuntimeVariable.FinalBranchCoverage in output_variables and config.CoverageMetric.BRANCH not in cov_metrics ): metrics_for_reinstrumentation.add(config.CoverageMetric.BRANCH) branch_cov_ff = ff.TestSuiteBranchCoverageFunction(executor) to_calculate.append((RuntimeVariable.FinalBranchCoverage, branch_cov_ff)) elif config.CoverageMetric.BRANCH in cov_metrics: # If we optimised for branches, we still want to get the final branch coverage. to_calculate.append( ( RuntimeVariable.FinalBranchCoverage, _get_coverage_ff_from_algorithm( algorithm, ff.TestSuiteBranchCoverageFunction ), ) ) def _run() -> ReturnCode: if (setup_result := _setup_and_check()) is None: return ReturnCode.SETUP_FAILED executor, test_cluster, constant_provider = setup_result # traces slices for test cases after execution coverage_metrics = config.configuration.statistics_output.coverage_metrics if config.CoverageMetric.CHECKED in coverage_metrics: executor.add_observer(StatementSlicingObserver(executor.tracer)) algorithm: GenerationAlgorithm = _instantiate_test_generation_strategy( executor, test_cluster, constant_provider ) _LOGGER.info("Start generating test cases") generation_result = algorithm.generate_tests() if algorithm.resources_left(): _LOGGER.info("Algorithm stopped before using all resources.") else: _LOGGER.info("Stopping condition reached") for stop in algorithm.stopping_conditions: _LOGGER.info("%s", stop) _LOGGER.info("Stop generating test cases") # Executions that happen after this point should not influence the # search statistics executor.clear_observers() _track_search_metrics(algorithm, generation_result, coverage_metrics) _remove_statements_after_exceptions(generation_result) _generate_assertions(executor, generation_result) tracked_metrics = _track_final_metrics( algorithm, executor, generation_result, constant_provider ) # Export the generated test suites if ( config.configuration.test_case_output.export_strategy == config.ExportStrategy.PY_TEST ): _export_chromosome(generation_result) if config.configuration.statistics_output.create_coverage_report: coverage_report = get_coverage_report( generation_result, executor, tracked_metrics, ) render_coverage_report( coverage_report, Path(config.configuration.statistics_output.report_dir) / "cov_report.html", datetime.datetime.now(), # noqa: DTZ005 ) render_xml_coverage_report( coverage_report, Path(config.configuration.statistics_output.report_dir) / "cov_report.xml", datetime.datetime.now(), # noqa: DTZ005 ) _collect_miscellaneous_statistics(test_cluster) if not stat.write_statistics(): _LOGGER.error("Failed to write statistics data") if generation_result.size() == 0: # not able to generate one test case return ReturnCode.NO_TESTS_GENERATED return ReturnCode.OK def _remove_statements_after_exceptions(generation_result): truncation = pp.ExceptionTruncation() generation_result.accept(truncation) if config.configuration.test_case_output.post_process: unused_primitives_removal = pp.TestCasePostProcessor( [pp.UnusedStatementsTestCaseVisitor()] ) generation_result.accept(unused_primitives_removal) # TODO(fk) add more postprocessing stuff. def _minimize_assertions(generation_result: tsc.TestSuiteChromosome): _LOGGER.info("Minimizing assertions based on checked coverage") assertion_minimizer = pp.AssertionMinimization() generation_result.accept(assertion_minimizer) stat.track_output_variable( RuntimeVariable.Assertions, len(assertion_minimizer.remaining_assertions) ) stat.track_output_variable( RuntimeVariable.DeletedAssertions, len(assertion_minimizer.deleted_assertions), ) _strategies: dict[config.MutationStrategy, Callable[[int], ms.HOMStrategy]] = { config.MutationStrategy.FIRST_TO_LAST: ms.FirstToLastHOMStrategy, config.MutationStrategy.BETWEEN_OPERATORS: ms.BetweenOperatorsHOMStrategy, config.MutationStrategy.RANDOM: ms.RandomHOMStrategy, config.MutationStrategy.EACH_CHOICE: ms.EachChoiceHOMStrategy, } def _setup_mutant_generator() -> mu.Mutator: operators: list[type[MutationOperator]] = [ *mo.standard_operators, *mo.experimental_operators, ] mutation_strategy = config.configuration.test_case_output.mutation_strategy if mutation_strategy == config.MutationStrategy.FIRST_ORDER_MUTANTS: return mu.FirstOrderMutator(operators) order = config.configuration.test_case_output.mutation_order if order <= 0: raise ConfigurationException("Mutation order should be > 0.") if mutation_strategy in _strategies: hom_strategy = _strategies[mutation_strategy](order) return mu.HighOrderMutator(operators, hom_strategy=hom_strategy) raise ConfigurationException("No suitable mutation strategy found.") def _setup_mutation_analysis_assertion_generator( executor: TestCaseExecutor, ) -> ag.MutationAnalysisAssertionGenerator: _LOGGER.info("Setup mutation generator") mutant_generator = _setup_mutant_generator() _LOGGER.info("Import module %s", config.configuration.module_name) module = importlib.import_module(config.configuration.module_name) _LOGGER.info("Build AST for %s", module.__name__) executor.tracer.current_thread_identifier = threading.current_thread().ident module_source_code = inspect.getsource(module) module_ast = ParentNodeTransformer.create_ast(module_source_code) _LOGGER.info("Mutate module %s", module.__name__) mutation_tracer = ExecutionTracer() mutation_controller = ag.InstrumentedMutationController( mutant_generator, module_ast, module, mutation_tracer ) assertion_generator = ag.MutationAnalysisAssertionGenerator( executor, mutation_controller ) _LOGGER.info("Generated %d mutants", mutation_controller.mutant_count()) return assertion_generator def _generate_assertions(executor, generation_result): ass_gen = config.configuration.test_case_output.assertion_generation if ass_gen != config.AssertionGenerator.NONE: _LOGGER.info("Start generating assertions") generator: cv.ChromosomeVisitor if ass_gen == config.AssertionGenerator.MUTATION_ANALYSIS: generator = _setup_mutation_analysis_assertion_generator(executor) else: generator = ag.AssertionGenerator(executor) generation_result.accept(generator) def _track_search_metrics( algorithm: GenerationAlgorithm, generation_result: tsc.TestSuiteChromosome, coverage_metrics: list[config.CoverageMetric], ) -> None: """Track multiple set coverage metrics of the generated test suites. This possibly re-executes the test suites. Args: algorithm: The test generation strategy generation_result: The resulting chromosome of the generation strategy coverage_metrics: The selected coverage metrics to guide the search """ for metric, runtime, fitness_type in [ ( config.CoverageMetric.LINE, RuntimeVariable.LineCoverage, ff.TestSuiteLineCoverageFunction, ), ( config.CoverageMetric.BRANCH, RuntimeVariable.BranchCoverage, ff.TestSuiteBranchCoverageFunction, ), ( config.CoverageMetric.CHECKED, RuntimeVariable.StatementCheckedCoverage, ff.TestSuiteStatementCheckedCoverageFunction, ), ]: if metric in coverage_metrics: coverage_function: ff.TestSuiteCoverageFunction = ( _get_coverage_ff_from_algorithm( algorithm, cast(type[ff.TestSuiteCoverageFunction], fitness_type) ) ) stat.track_output_variable( runtime, generation_result.get_coverage_for(coverage_function) ) # Write overall coverage data of result stat.current_individual(generation_result) def _instantiate_test_generation_strategy( executor: TestCaseExecutor, test_cluster: ModuleTestCluster, constant_provider: ConstantProvider, ) -> GenerationAlgorithm: factory = gaf.TestSuiteGenerationAlgorithmFactory( executor, test_cluster, constant_provider ) return factory.get_search_algorithm() def _collect_miscellaneous_statistics(test_cluster: ModuleTestCluster) -> None: test_cluster.log_cluster_statistics() stat.track_output_variable( RuntimeVariable.TargetModule, config.configuration.module_name ) stat.track_output_variable(RuntimeVariable.RandomSeed, randomness.RNG.get_seed()) stat.track_output_variable( RuntimeVariable.ConfigurationId, config.configuration.statistics_output.configuration_id, ) stat.track_output_variable( RuntimeVariable.RunId, config.configuration.statistics_output.run_id ) stat.track_output_variable( RuntimeVariable.ProjectName, config.configuration.statistics_output.project_name ) for runtime_variable, value in stat.variables_generator: stat.set_output_variable_for_runtime_variable(runtime_variable, value) def _export_chromosome( chromosome: chrom.Chromosome, file_name_suffix: str = "", ) -> None: """Export the given chromosome. Args: chromosome: the chromosome to export. file_name_suffix: Suffix that can be added to the file name to distinguish between different results e.g., failing and succeeding test cases. Returns: The name of the target file """ module_name = config.configuration.module_name.replace(".", "_") target_file = ( Path(config.configuration.test_case_output.output_path).resolve() / f"test_{module_name}{file_name_suffix}.py" ) export_visitor = export.PyTestChromosomeToAstVisitor() chromosome.accept(export_visitor) export.save_module_to_file( export_visitor.to_module(), target_file, format_with_black=config.configuration.test_case_output.format_with_black, ) _LOGGER.info("Written %i test cases to %s", chromosome.size(), target_file)