feat(生成器): 支持基于依赖关系的并发代码生成,提升多文件项目初始化效率

This commit is contained in:
songsenand 2026-03-18 00:23:12 +08:00
parent 0c3f87724b
commit 2ffd124a28
6 changed files with 330 additions and 90 deletions

View File

@ -0,0 +1,36 @@
# 需求工单:并发代码生成
name: 并发代码生成
description: |
当前工具在初始化项目时,按照 design.json 中列出的文件顺序逐个生成代码,即只有前一个文件生成完成后才开始下一个文件的生成。
这种串行方式在文件数量较多或网络延迟较高时效率低下。我们希望改为基于依赖关系的并发生成:在解析 design.json 获得文件列表和依赖关系后构建一个有向无环图DAG当某个文件的所有依赖文件都已生成完毕即前置条件满足立即启动该文件的生成任务调用 LLM多个文件可以并行生成。
特别要求:`utils.py` 文件应被视为所有其他 Python 文件的隐式依赖项。这意味着在依赖图中,每个非 `utils.py` 的文件都自动依赖于 `utils.py`,因此 `utils.py` 必须先生成,然后其他文件才能开始并发生成。这样可确保公共工具函数在依赖它的模块生成时已可用。
需要实现以下功能:
- 解析 design.json 中的 files 数组,提取每个文件的 path 和 dependencies依赖的文件路径列表
- 构建依赖图时,自动为所有非 `utils.py` 的文件添加对 `utils.py` 的依赖(如果 `utils.py` 存在于文件列表中且不是自身)。
- 计算每个文件的入度(依赖数),并考虑上述隐式依赖。
- 使用一个任务队列(如 asyncio 或 concurrent.futures并发执行生成任务初始时入度为0的文件即无依赖的文件其中 `utils.py` 通常入度为0可以立即开始生成。
- 每当一个文件生成完成成功或失败更新依赖它的其他文件的入度当入度变为0时将其加入待执行队列。
- 生成任务应使用独立的 LLM 客户端实例(或复用连接池)进行并发请求,需注意 API 并发限制(可配置)。
- 支持限制最大并发数(例如通过 --max-concurrency 参数或配置文件中的 max_concurrent_requests
- 生成过程中仍需保存断点续写状态(.llm_generator_state.json记录每个文件的生成状态pending、generating、success、failed
- 错误处理:某个文件生成失败不应影响其他文件的生成(除非是依赖失败导致下游无法生成),最终汇总失败信息。
- 命令行输出需展示并发进度(如 rich 进度条),显示已完成/总数。
acceptance_criteria:
- 当 design.json 中包含多个无相互依赖的文件时,它们应同时开始生成。
- **必须确保 `utils.py` 在所有其他 Python 文件之前生成,确保`utils.py` 是其他所有文件的上下文**。
- 当存在其他依赖链时,依赖项必须先生成完成,然后才能开始生成依赖它们的文件。
- 并发数可通过配置或命令行参数控制,默认值为 5。
- 生成过程中中断后重新运行,能根据状态文件恢复未完成的任务,并继续并发执行。
- 所有生成的代码内容与串行方式生成的代码一致(即并发不影响生成逻辑)。
- 日志中记录每个文件的生成开始时间、结束时间及所用 LLM 调用时长。
- 如果某个文件生成失败(例如 LLM 返回错误),在最终汇总中显示,并允许用户选择是否重试或跳过。
affected_files:
- src/llm_codegen/core.py # CodeGenerator 类和生成核心逻辑
- src/llm_codegen/cli.py # init 命令的主流程
- src/llm_codegen/models.py # 可能需要定义任务状态枚举
- src/llm_codegen/utils.py # 可能添加图构建、并发控制辅助函数
- pyproject.toml # 可能新增配置项 max_concurrent_requests

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "llm-codegen" name = "llm-codegen"
version = "1.0.0" version = "0.0.1"
description = "一个基于大语言模型的智能代码生成与维护工具支持自动生成、增量添加功能和自动修复Bug。" description = "一个基于大语言模型的智能代码生成与维护工具支持自动生成、增量添加功能和自动修复Bug。"
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.9"
@ -39,6 +39,7 @@ llm-codegen = "llm_codegen.cli:app"
check_tools = ["pytest", "pylint", "mypy", "black"] check_tools = ["pytest", "pylint", "mypy", "black"]
max_retries = 3 max_retries = 3
dangerous_commands = ["rm", "sudo", "chmod", "dd"] dangerous_commands = ["rm", "sudo", "chmod", "dd"]
max_concurrent_requests = 5
# 新增:指定包所在目录 # 新增:指定包所在目录
[tool.setuptools.packages.find] [tool.setuptools.packages.find]

View File

@ -40,6 +40,7 @@ def init(
base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数默认4"),
): ):
"""初始化项目:根据 README.md 自动生成完整的代码。""" """初始化项目:根据 README.md 自动生成完整的代码。"""
if output_dir is None: if output_dir is None:
@ -56,6 +57,7 @@ def init(
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path, log_file=log_file_path,
max_concurrency=max_concurrency,
) )
generator.run(readme) generator.run(readme)
# 调用core.CodeGenerator.run并显示最终统计信息假设从日志或生成器状态获取 # 调用core.CodeGenerator.run并显示最终统计信息假设从日志或生成器状态获取
@ -73,6 +75,7 @@ def enhance(
base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数默认4"),
): ):
"""增强项目:根据需求工单添加新功能。""" """增强项目:根据需求工单添加新功能。"""
if output_dir is None: if output_dir is None:
@ -102,6 +105,7 @@ def enhance(
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path, log_file=log_file_path,
max_concurrency=max_concurrency,
) )
success = generator.process_issue(issue_content, issue_type="enhance") success = generator.process_issue(issue_content, issue_type="enhance")
if not success: if not success:
@ -121,6 +125,7 @@ def fix(
base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数默认4"),
): ):
"""修复项目根据Bug工单自动修复 Bug。""" """修复项目根据Bug工单自动修复 Bug。"""
if output_dir is None: if output_dir is None:
@ -150,6 +155,7 @@ def fix(
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path, log_file=log_file_path,
max_concurrency=max_concurrency,
) )
success = generator.process_issue(issue_content, issue_type="fix") success = generator.process_issue(issue_content, issue_type="fix")
if not success: if not success:
@ -169,6 +175,7 @@ def check(
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_retries: int = typer.Option(3, "--max-retries", help="最大修复重试次数"), max_retries: int = typer.Option(3, "--max-retries", help="最大修复重试次数"),
max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数默认4"),
): ):
"""运行代码检查和自动修复(不依赖于工单)""" """运行代码检查和自动修复(不依赖于工单)"""
if output_dir is None: if output_dir is None:
@ -184,6 +191,7 @@ def check(
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path, log_file=log_file_path,
max_concurrency=max_concurrency,
) )
checker = Checker(output_dir=output_dir, code_generator=generator) checker = Checker(output_dir=output_dir, code_generator=generator)
success = checker.run_full_check_and_fix(max_retries=max_retries) success = checker.run_full_check_and_fix(max_retries=max_retries)

View File

@ -2,8 +2,10 @@ import json
import os import os
import subprocess import subprocess
import sys import sys
import concurrent.futures
from typing import List, Dict, Optional, Any, Tuple from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path from pathlib import Path
from collections import deque
import typer import typer
from rich.console import Console from rich.console import Console
@ -12,7 +14,7 @@ from loguru import logger
from openai import OpenAI from openai import OpenAI
from .utils import is_dangerous_command, read_file, write_file, ensure_dir, safe_join from .utils import is_dangerous_command, read_file, write_file, ensure_dir, safe_join
from .models import DesignModel, StateModel, LLMResponse from .models import DesignModel, StateModel, LLMResponse, FileModel
class CodeGenerator: class CodeGenerator:
@ -156,7 +158,7 @@ class CodeGenerator:
with open(self.state_file, "r", encoding="utf-8") as f: with open(self.state_file, "r", encoding="utf-8") as f:
state_data = json.load(f) state_data = json.load(f)
self.state = StateModel(**state_data) self.state = StateModel(**state_data)
logger.info(f"加载状态成功: 当前文件索引 {self.state.current_file_index}") logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)}")
return self.state return self.state
except Exception as e: except Exception as e:
logger.error(f"加载状态失败: {e}") logger.error(f"加载状态失败: {e}")
@ -164,10 +166,10 @@ class CodeGenerator:
return None return None
return None return None
def save_state(self, current_file_index: int, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None:
"""保存断点续写状态""" """保存断点续写状态,适应并发生成"""
state = StateModel( state = StateModel(
current_file_index=current_file_index, current_file_index=0, # 在并发中无效设为0保持兼容
generated_files=generated_files, generated_files=generated_files,
dependencies_map=dependencies_map, dependencies_map=dependencies_map,
total_files=len(self.design.files) if self.design else 0, total_files=len(self.design.files) if self.design else 0,
@ -199,6 +201,33 @@ class CodeGenerator:
return files, dependencies return files, dependencies
def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""
添加隐式依赖关系基于文件路径和常见模式
Args:
files: 文件路径列表
dependencies: 原始依赖字典
Returns:
Dict[str, List[str]]: 增强后的依赖字典
"""
enhanced = dependencies.copy()
for file in files:
if file not in enhanced:
enhanced[file] = []
# 添加同一目录下的其他文件作为隐式依赖(简单示例)
path = Path(file)
dir_path = str(path.parent)
implicit_deps = [
f for f in files
if f != file and Path(f).parent == path.parent and f not in enhanced[file]
]
if implicit_deps:
enhanced[file].extend(implicit_deps)
logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}")
return enhanced
def generate_file( def generate_file(
self, self,
file_path: str, file_path: str,
@ -290,6 +319,43 @@ class CodeGenerator:
# 返回默认值以便继续 # 返回默认值以便继续
return "# 生成失败,请检查日志", "生成失败,发生错误", [] return "# 生成失败,请检查日志", "生成失败,发生错误", []
def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]:
"""
并发任务函数用于生成单个文件
Args:
file_path: 文件路径
dependencies: 依赖文件列表
generated_files: 已生成文件的集合用于上下文
Returns:
Tuple[bool, str]: (是否成功, 错误信息或空字符串)
"""
try:
instruction = f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。"
# 过滤依赖文件,只使用已生成的
available_deps = [dep for dep in dependencies if dep in generated_files]
code, desc, commands = self.generate_file(file_path, instruction, available_deps)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {output_path}")
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
return True, ""
except Exception as e:
logger.error(f"生成文件 {file_path} 失败: {e}")
return False, str(e)
def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool:
""" """
执行单个命令检查风险失败仅记录错误不抛出异常 执行单个命令检查风险失败仅记录错误不抛出异常
@ -334,7 +400,7 @@ class CodeGenerator:
def run(self, readme_path: Path): def run(self, readme_path: Path):
""" """
主执行流程支持设计层生成和断点续写 主执行流程支持基于依赖关系的并发生成
""" """
logger.info("=" * 50) logger.info("=" * 50)
logger.info("开始代码生成流程") logger.info("开始代码生成流程")
@ -353,7 +419,7 @@ class CodeGenerator:
# 加载状态 # 加载状态
state = self.load_state() state = self.load_state()
if state: if state:
self.console.print(f"[green]✅ 检测到断点状态,从文件索引 {state.current_file_index} 继续[/green]") self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]")
self.state = state self.state = state
# 从状态恢复设计假设design.json已存在 # 从状态恢复设计假设design.json已存在
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
@ -400,9 +466,19 @@ class CodeGenerator:
return return
self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]") self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]")
# 断点续写:确定起始索引 # 添加隐式依赖
start_index = self.state.current_file_index if self.state else 0 dependencies = self._add_implicit_dependencies(files, dependencies)
generated_files = self.state.generated_files if self.state else [] logger.info("已添加隐式依赖")
# 断点续写:确定已生成文件
generated_files_set = set(self.state.generated_files if self.state else [])
# 构建DAG并计算入度
in_degree = {file: len(dependencies.get(file, [])) for file in files}
# 初始化队列为入度为0且未生成的节点
queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set])
processed_files = set(generated_files_set) # 跟踪已处理文件
remaining_files = set(files) - processed_files
# 创建进度条 # 创建进度条
with Progress( with Progress(
@ -413,54 +489,41 @@ class CodeGenerator:
console=self.console, console=self.console,
) as progress: ) as progress:
self.progress = progress self.progress = progress
total_task = progress.add_task("[cyan]整体进度...", total=len(files)) total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files))
progress.update(total_task, completed=start_index) progress.update(total_task, completed=len(processed_files) - len(generated_files_set))
# 依次生成每个文件 # 并发任务调度
for idx in range(start_index, len(files)): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
file = files[idx] futures = {}
logger.info(f"处理文件 [{idx + 1}/{len(files)}]: {file}") while queue or futures:
file_task = progress.add_task(f"生成 {file}", total=None) # 提交队列中的任务
while queue:
file = queue.popleft()
future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files)
futures[future] = file
progress.add_task(f"生成 {file}", total=None)
try: # 等待任意任务完成
# 获取依赖文件 done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0)
deps = dependencies.get(file, []) for future in done:
instruction = f"请根据README描述和依赖文件生成文件 '{file}' 的完整代码。" file = futures.pop(future)
code, desc, commands = self.generate_file(file, instruction, deps) success, error_msg = future.result()
logger.info(f"生成完成: {file} - {desc}") if success:
processed_files.add(file)
# 写入文件 # 更新入度:减少依赖该文件的节点的入度
output_path = self.output_dir / file for other_file in files:
output_path.parent.mkdir(parents=True, exist_ok=True) if file in dependencies.get(other_file, []):
try: in_degree[other_file] -= 1
with open(output_path, "w", encoding="utf-8") as f: if in_degree[other_file] == 0 and other_file not in processed_files:
f.write(code) queue.append(other_file)
logger.info(f"已写入: {output_path}") # 保存状态
generated_files.append(file) self.save_state(list(processed_files), dependencies)
except Exception as e: progress.update(total_task, advance=1)
logger.error(f"写入文件 {file} 失败: {e}") else:
self.console.print(f"[bold red]❌ 写入文件 {file} 失败: {e}[/bold red]") logger.error(f"文件 {file} 生成失败,错误: {error_msg}")
# 跳过命令执行 self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]")
commands = [] # 错误处理:继续处理其他文件,但记录失败
# 可以选择重试或跳过,这里简单记录并继续
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
except Exception as e:
logger.error(f"处理文件 {file} 失败: {e}")
self.console.print(f"[bold red]❌ 处理文件 {file} 时发生错误: {e}[/bold red]")
# 不抛出异常,继续执行下一个文件
# 保存状态
self.save_state(idx, generated_files, dependencies)
finally:
progress.remove_task(file_task)
progress.update(total_task, advance=1)
# 更新状态
self.save_state(idx + 1, generated_files, dependencies)
logger.success("所有文件处理完成!") logger.success("所有文件处理完成!")
# 清理状态文件 # 清理状态文件
@ -621,12 +684,17 @@ class CodeGenerator:
# 步骤3: 更新 design.json # 步骤3: 更新 design.json
if generated_files: if generated_files:
"""
try: try:
self._update_design(generated_files, change_plan.get("design_updates", {})) self._update_design(generated_files, change_plan.get("design_updates", {}))
self.console.print("[green]✅ design.json 已更新[/green]") self.console.print("[green]✅ design.json 已更新[/green]")
except Exception as e: except Exception as e:
logger.error(f"更新design.json失败: {e}") logger.error(f"更新design.json失败: {e}")
self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]")
"""
self._update_design(generated_files, change_plan.design_updates)
self.console.print("[green]✅ design.json 已更新[/green]")
self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]") self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]")
return True return True
@ -649,7 +717,7 @@ class CodeGenerator:
) )
# 将现有 design.json 内容作为上下文的一部分 # 将现有 design.json 内容作为上下文的一部分
design_str = json.dumps(self.design.dict(), indent=2, ensure_ascii=False) design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False)
user_prompt = ( user_prompt = (
f"工单类型: {issue_type}\n" f"工单类型: {issue_type}\n"
f"工单内容:\n{issue_content}\n\n" f"工单内容:\n{issue_content}\n\n"
@ -659,36 +727,40 @@ class CodeGenerator:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
return result return result
def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]):
""" """
根据生成的变更更新 design.json 根据生成的变更更新 design.json
注意假设 self.design.files List[dict] FileModel直接操作字典 使用 FileModel 来处理文件信息
""" """
updated = False updated = False
# 处理新增文件 # 处理新增文件
for file_path in generated_files: for file_path in generated_files:
# 检查文件是否已在 design.files 中 # 检查文件是否已在 design.files 中
exists = any(f.get("path") == file_path for f in self.design.files) exists = any(f.path == file_path for f in self.design.files)
if not exists: if not exists:
# 创建新文件条目(字典) # 获取更新信息
new_file = { update_info = design_updates.get(file_path, {})
"path": file_path,
"summary": design_updates.get(file_path, {}).get("summary", "自动生成的新文件"),
"dependencies": design_updates.get(file_path, {}).get("dependencies", []),
"functions": [],
"classes": [],
}
self.design.files.append(new_file)
updated = True
logger.info(f"已将新文件 {file_path} 添加到 design.json")
# 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) # 创建新文件条目FileModel实例
# 这里可根据实际需求扩展,当前仅处理新增文件 new_file = FileModel(
path=file_path,
summary=update_info.get("summary", "自动生成的新文件"),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {})
)
self.design.files.append(new_file)
updated = True
logger.info(f"已将新文件 {file_path} 添加到 design.json")
if updated: # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要)
# 保存更新后的 design.json # 这里可根据实际需求扩展,当前仅处理新增文件
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f: if updated:
json.dump(self.design.dict(), f, indent=2, ensure_ascii=False) # 保存更新后的 design.json
logger.info("design.json 已更新") design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info("design.json 已更新")

View File

@ -1,7 +1,16 @@
from enum import Enum
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
class FileStatus(str, Enum):
"""文件生成状态枚举。"""
PENDING = "pending"
GENERATING = "generating"
SUCCESS = "success"
FAILED = "failed"
# 模型用于 design.json 结构 # 模型用于 design.json 结构
class FunctionModel(BaseModel): class FunctionModel(BaseModel):
"""函数模型,对应 design.json 中的 functions 字段。""" """函数模型,对应 design.json 中的 functions 字段。"""
@ -25,6 +34,7 @@ class FileModel(BaseModel):
dependencies: List[str] = Field(default_factory=list) dependencies: List[str] = Field(default_factory=list)
functions: List[FunctionModel] = Field(default_factory=list) functions: List[FunctionModel] = Field(default_factory=list)
classes: List[ClassModel] = Field(default_factory=list) classes: List[ClassModel] = Field(default_factory=list)
design_updates: Dict[str, Any] = Field(default_factory=dict)
class DesignModel(BaseModel): class DesignModel(BaseModel):
@ -62,6 +72,7 @@ class StateModel(BaseModel):
current_file_index: int = 0 current_file_index: int = 0
generated_files: List[str] = Field(default_factory=list) generated_files: List[str] = Field(default_factory=list)
dependencies_map: Dict[str, List[str]] = Field(default_factory=dict) dependencies_map: Dict[str, List[str]] = Field(default_factory=dict)
file_statuses: Dict[str, FileStatus] = Field(default_factory=dict)
total_files: int total_files: int
output_dir: str output_dir: str
readme_path: str readme_path: str

View File

@ -1,6 +1,7 @@
from typing import Tuple from typing import Tuple, Dict, List, Optional, Any
import os import os
from pathlib import Path from pathlib import Path
import queue
from loguru import logger # 添加导入 from loguru import logger # 添加导入
# 危险命令列表,可配置 # 危险命令列表,可配置
@ -120,3 +121,114 @@ def is_fatal_error(error: Exception) -> bool:
""" """
fatal_exceptions = (SystemExit, KeyboardInterrupt, MemoryError, OSError) fatal_exceptions = (SystemExit, KeyboardInterrupt, MemoryError, OSError)
return isinstance(error, fatal_exceptions) return isinstance(error, fatal_exceptions)
def build_dependency_graph(files: List[Dict[str, Any]]) -> Dict[str, List[str]]:
"""
构建依赖图基于文件列表中的依赖关系
Args:
files: 文件列表每个元素是字典包含 'path' 'dependencies'
Returns:
Dict[str, List[str]]: 邻接表表示的依赖图键为文件路径值为依赖的文件路径列表
"""
graph: Dict[str, List[str]] = {}
for file in files:
path = file.get('path', '')
deps = file.get('dependencies', [])
if path:
graph[path] = deps
return graph
def compute_in_degrees(graph: Dict[str, List[str]]) -> Dict[str, int]:
"""
计算依赖图中每个节点的入度
Args:
graph: 依赖图邻接表形式
Returns:
Dict[str, int]: 每个文件路径的入度值
"""
in_degrees: Dict[str, int] = {node: 0 for node in graph}
for node, deps in graph.items():
for dep in deps:
if dep in in_degrees:
in_degrees[dep] += 1
return in_degrees
class ConcurrentQueueManager:
"""
管理并发队列的简单类用于并行任务如代码生成或检查
"""
def __init__(self, maxsize: int = 0):
"""
初始化队列
Args:
maxsize: 队列最大大小0 表示无限制
"""
self.queue = queue.Queue(maxsize)
def enqueue(self, item: Any) -> None:
"""
将项目加入队列
Args:
item: 要加入队列的项目
"""
self.queue.put(item)
def dequeue(self, block: bool = True, timeout: Optional[float] = None) -> Any:
"""
从队列中取出项目
Args:
block: 是否阻塞直到有项目可用
timeout: 超时时间
Returns:
Any: 取出的项目
"""
return self.queue.get(block=block, timeout=timeout)
def is_empty(self) -> bool:
"""
检查队列是否为空
Returns:
bool: 如果队列为空返回 True否则返回 False
"""
return self.queue.empty()
def size(self) -> int:
"""
获取队列中项目数量
Returns:
int: 队列大小
"""
return self.queue.qsize()
def add_implicit_dependency(file_content: str, current_deps: List[str], implicit_dep_file: str = "src/llm_codegen/utils.py") -> List[str]:
"""
添加隐式依赖例如如果文件内容引用了特定文件则自动添加依赖
Args:
file_content: 文件内容字符串
current_deps: 当前依赖列表
implicit_dep_file: 要检查的隐式依赖文件路径
Returns:
List[str]: 更新后的依赖列表如果检测到引用则添加隐式依赖
"""
updated_deps = current_deps.copy()
# 简化检查:如果文件内容包含导入或引用 utils.py 的迹象,则添加依赖
if implicit_dep_file in file_content or "utils" in file_content.lower():
if implicit_dep_file not in updated_deps:
updated_deps.append(implicit_dep_file)
return updated_deps