Skip to content

Commit

Permalink
fix: unittest generator bug
Browse files Browse the repository at this point in the history
* feat: better logging

* fix: mutant killing logic

* feat: improve mutation coverage logging and fix unittest generator bug
  • Loading branch information
jungs1 authored Aug 5, 2024
1 parent 3318627 commit ebe00b2
Showing 1 changed file with 81 additions and 34 deletions.
115 changes: 81 additions & 34 deletions src/mutahunter/core/unittest_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
from mutahunter.core.controller import MutationTestController
from mutahunter.core.coverage_processor import CoverageProcessor
from mutahunter.core.db import MutationDatabase
from mutahunter.core.entities.config import (MutationTestControllerConfig,
UnittestGeneratorConfig)
from mutahunter.core.entities.config import (
MutationTestControllerConfig,
UnittestGeneratorConfig,
)
from mutahunter.core.error_parser import extract_error_message
from mutahunter.core.prompts.unittest_generator import (
FAILED_TESTS_TEXT, LINE_COV_UNITTEST_GENERATOR_USER_PROMPT,
MUTATION_COV_UNITTEST_GENERATOR_USER_PROMPT, MUTATION_WEAK_TESTS_TEXT)
FAILED_TESTS_TEXT,
LINE_COV_UNITTEST_GENERATOR_USER_PROMPT,
MUTATION_COV_UNITTEST_GENERATOR_USER_PROMPT,
MUTATION_WEAK_TESTS_TEXT,
)
from mutahunter.core.router import LLMRouter
from mutahunter.core.runner import MutantTestRunner
from mutahunter.core.logger import logger

SYSTEM_YAML_FIX = """
Based on the error message, the YAML content provided is not in the correct format. Please ensure the YAML content is in the correct format and try again.
Expand Down Expand Up @@ -67,25 +73,29 @@ def __init__(
self.config.source_file_path
)
self.file_version_id = file_version_id
self.num = 0

def run(self) -> None:
self.coverage_processor.parse_coverage_report()
initial_line_coverage_rate = self.coverage_processor.line_coverage_rate
print("Initial line coverage rate:", initial_line_coverage_rate)
logger.info(f"Initial line coverage rate: {initial_line_coverage_rate}")
self.increase_line_coverage()
logger.info(
f"Line coverage increased from {initial_line_coverage_rate*100:.2f}% to {self.coverage_processor.line_coverage_rate*100:.2f}%"
)
self.mutator.run()
latest_run_id = self.db.get_latest_run_id()
print("Latest run ID:", latest_run_id)
data = self.db.get_mutant_summary(latest_run_id)
self.latest_run_id = self.db.get_latest_run_id()
data = self.db.get_mutant_summary(self.latest_run_id)
logger.info(f"Data: {data}")
initial_mutation_coverage_rate = data["mutation_coverage"]
print("Initial mutation coverage rate:", initial_mutation_coverage_rate)
logger.info(f"Initial mutation coverage rate: {initial_mutation_coverage_rate}")
self.increase_mutation_coverage()
print(
logger.info(
f"Line coverage increased from {initial_line_coverage_rate*100:.2f}% to {self.coverage_processor.line_coverage_rate*100:.2f}%"
)
data = self.db.get_mutant_summary(latest_run_id)
data = self.db.get_mutant_summary(self.latest_run_id)
final_mutation_coverage_rate = data["mutation_coverage"]
print(
logger.info(
f"Mutation coverage increased from {initial_mutation_coverage_rate*100:.2f}% to {final_mutation_coverage_rate*100:.2f}%"
)

Expand All @@ -103,14 +113,14 @@ def increase_line_coverage(self):

def increase_mutation_coverage(self):
attempt = 0
latest_run_id = self.db.get_latest_run_id()
data = self.db.get_mutant_summary(latest_run_id)
data = self.db.get_mutant_summary(self.latest_run_id)
mutation_coverage_rate = data["mutation_coverage"]
self.failed_unittests = []
while (
mutation_coverage_rate < self.config.target_mutation_coverage_rate
and attempt < self.config.max_attempts
):
logger.info(f"Mutation coverage rate: {mutation_coverage_rate}")
attempt += 1
response = self.generate_unittests_for_mutants()
self._process_generated_unittests_for_mutation(response)
Expand Down Expand Up @@ -145,10 +155,22 @@ def generate_unittests(self):
prompt={"system": "", "user": user_template}, streaming=True
)
otuput = self.extract_response(response)
self._save_yaml(otuput, "line")
return otuput
except Exception as e:
raise

def _save_yaml(self, data, type):
if not os.path.exists("logs/_latest"):
os.makedirs("logs/_latest")
if not os.path.exists("logs/_latest/unittest"):
os.makedirs("logs/_latest/unittest")
if not os.path.exists(f"logs/_latest/unittest/{type}"):
os.makedirs(f"logs/_latest/unittest/{type}")
output = f"unittest_{self.num}.yaml"
with open(os.path.join(f"logs/_latest/unittest/{type}", output), "w") as f:
yaml.dump(data, f, default_flow_style=False, indent=2)

def _process_generated_unittests(self, response: dict) -> list:
generated_unittests = response.get("new_tests", [])
insertion_point_marker = response.get("insertion_point_marker", {})
Expand Down Expand Up @@ -213,31 +235,44 @@ def validate_unittest(
cwd=os.getcwd(),
)
if result.returncode == 0:
print("Test passed")

prev_line_coverage_rate = self.coverage_processor.line_coverage_rate
self.coverage_processor.parse_coverage_report()

if check_line_coverage:
if self.check_line_coverage_increase(prev_line_coverage_rate):
logger.info(f"Test passed and increased line cov:\n{test_code}")

return True
else:
logger.info(
f"Test passed but failed to increase line cov for\n{test_code}"
)
if check_mutantation_coverage:
if self.check_mutant_coverage_increase(
generated_unittest, test_code
):
logger.info(
f"Test passed and increased mutation cov:\n{test_code}"
)
return True
else:
logger.info(
f"Test passed but failed to increase mutation cov for\n{test_code}"
)
else:
print("Test failed")
logger.info(f"Test failed for\n{test_code}")
self._handle_failed_test(result, test_code)
except Exception as e:
print(f"Failed to validate unittest: {e}")
logger.info(f"Failed to validate unittest: {e}")
raise
else:
FileUtils.revert(self.config.test_file_path)
return False

def check_line_coverage_increase(self, prev_line_coverage_rate):
if self.coverage_processor.line_coverage_rate > prev_line_coverage_rate:
print(
logger.info(
f"Line coverage increased from {prev_line_coverage_rate*100:.2f}% to {self.coverage_processor.line_coverage_rate*100:.2f}%"
)
return True
Expand All @@ -247,12 +282,15 @@ def check_line_coverage_increase(self, prev_line_coverage_rate):
def check_mutant_coverage_increase(self, generated_unittest, test_code):
runner = MutantTestRunner(test_command=self.config.test_command)

mutants = self.db.get_survived_mutants_by_file_version_id(
file_version_id=self.file_version_id
)
mutants = self.db.get_survived_mutants_by_run_id(run_id=self.latest_run_id)
# logger.info(f"Mutants: {json.dumps(mutants, indent=2)}")

for mutant in mutants:
if mutant["id"] == generated_unittest["mutant_id"]:
if int(mutant["id"]) == int(generated_unittest["mutant_id"]):
logger.info(f"Mutant {mutant['id']} selected for testing")
logger.info(f"source file path: {self.config.source_file_path}")
logger.info(f"replacement module path: {mutant['mutant_path']}")
logger.info(f"test command: {self.config.test_command}")
result = runner.run_test(
{
"module_path": self.config.source_file_path,
Expand All @@ -261,17 +299,21 @@ def check_mutant_coverage_increase(self, generated_unittest, test_code):
}
)
if result.returncode == 0:
print("Mutation coverage did not increase")
logger.info(f"Mutant {mutant['id']} survived")
self.weak_unittests.append(
{
"code": test_code,
"survived_mutant_id": f"Mutant {mutant['id']} not killed",
}
)
else:
print("Mutation coverage increased!")
logger.info(f"Mutant {mutant['id']} killed")
self.db.update_mutant_status(mutant["id"], "KILLED")
return True
else:
logger.info(
f"Mutant {mutant['id']} not selected for testing. Generated mutant id: {generated_unittest['mutant_id']}"
)
return False

def _handle_failed_test(self, result, test_code):
Expand All @@ -282,10 +324,9 @@ def _handle_failed_test(self, result, test_code):
def generate_unittests_for_mutants(self) -> None:
source_code = FileUtils.read_file(self.config.source_file_path)
language = filename_to_lang(self.config.source_file_path)
self.db
# filter survived mutants
survived_mutants = self.db.get_survived_mutants_by_file_version_id(
self.file_version_id
survived_mutants = self.db.get_survived_mutants_by_run_id(
run_id=self.latest_run_id
)

if not survived_mutants:
Expand Down Expand Up @@ -317,10 +358,10 @@ def generate_unittests_for_mutants(self) -> None:
),
)
response, _, _ = self.router.generate_response(
prompt={"system": "", "user": user_template}, streaming=False
prompt={"system": "", "user": user_template}, streaming=True
)

resp = self.extract_response(response)
self._save_yaml(resp, "mutation")
return resp

def _process_generated_unittests_for_mutation(self, response) -> None:
Expand Down Expand Up @@ -392,9 +433,9 @@ def read_file(path: str) -> str:
with open(path, "r") as file:
return file.read()
except FileNotFoundError:
print(f"File not found: {path}")
logger.info(f"File not found: {path}")
except Exception as e:
print(f"Error reading file {path}: {e}")
logger.info(f"Error reading file {path}: {e}")
raise

@staticmethod
Expand All @@ -403,7 +444,7 @@ def backup_code(file_path: str) -> None:
try:
shutil.copyfile(file_path, backup_path)
except Exception as e:
print(f"Failed to create backup file for {file_path}: {e}")
logger.info(f"Failed to create backup file for {file_path}: {e}")
raise

@staticmethod
Expand All @@ -416,6 +457,12 @@ def insert_code(file_path: str, code: str, position: int) -> None:
lines.insert(position, code)
with open(file_path, "w") as file:
file.write("\n".join(lines))

# import uuid

# random_name = str(uuid.uuid4())[:4]
# with open(f"{random_name}.java", "w") as file:
# file.write("\n".join(lines))
except Exception as e:
raise

Expand All @@ -426,8 +473,8 @@ def revert(file_path: str) -> None:
if os.path.exists(backup_path):
shutil.copyfile(backup_path, file_path)
else:
print(f"No backup file found for {file_path}")
logger.info(f"No backup file found for {file_path}")
raise FileNotFoundError(f"No backup file found for {file_path}")
except Exception as e:
print(f"Failed to revert file {file_path}: {e}")
logger.info(f"Failed to revert file {file_path}: {e}")
raise

0 comments on commit ebe00b2

Please sign in to comment.