refactor(llm_codegen): 拆分 core.py 为基类和子命令专用类

This commit is contained in:
songsenand 2026-03-20 06:16:23 +08:00
parent 97de36207c
commit 42e63f2d93
7 changed files with 1753 additions and 376 deletions

View File

@ -230,6 +230,63 @@
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/init_generator.py",
"summary": "初始化命令生成器,处理 init 命令逻辑",
"dependencies": [
"src/llm_codegen/core.py",
"src/llm_codegen/models.py"
],
"functions": [],
"classes": [
{
"name": "InitGenerator",
"summary": "继承自 BaseGenerator包含 run 方法",
"methods": [
"run"
]
}
],
"design_updates": {}
},
{
"path": "src/llm_codegen/enhance_generator.py",
"summary": "增强命令生成器,处理 enhance 命令逻辑",
"dependencies": [
"src/llm_codegen/core.py",
"src/llm_codegen/models.py"
],
"functions": [],
"classes": [
{
"name": "EnhanceGenerator",
"summary": "继承自 BaseGenerator包含 process_enhance 方法",
"methods": [
"process_enhance"
]
}
],
"design_updates": {}
},
{
"path": "src/llm_codegen/fix_generator.py",
"summary": "修复命令生成器,处理 fix 命令逻辑",
"dependencies": [
"src/llm_codegen/core.py",
"src/llm_codegen/models.py"
],
"functions": [],
"classes": [
{
"name": "FixGenerator",
"summary": "继承自 BaseGenerator包含 process_fix 方法",
"methods": [
"process_fix"
]
}
],
"design_updates": {}
}
],
"commands": [

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,10 @@ from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
from loguru import logger
from .core import CodeGenerator
from .core import BaseGenerator
from .init_generator import InitGenerator
from .enhance_generator import EnhanceGenerator
from .fix_generator import FixGenerator
from .checker import Checker
app = typer.Typer(help="基于LLM的自动化代码生成与维护工具")
@ -71,7 +74,7 @@ def init(
console=console,
) as progress:
task_id = progress.add_task("正在初始化项目...", total=1) # 修改设置总任务数为1以控制进度显示
generator = CodeGenerator(
generator = InitGenerator(
api_key=api_key,
base_url=base_url,
model=model,
@ -142,7 +145,7 @@ def enhance(
console=console,
) as progress:
task_id = progress.add_task("正在增强项目...", total=1) # 修改设置总任务数为1以控制进度显示
generator = CodeGenerator(
generator = EnhanceGenerator(
api_key=api_key,
base_url=base_url,
model=model,
@ -150,7 +153,7 @@ def enhance(
log_file=log_file_path,
max_concurrency=max_concurrency,
)
success = generator.process_issue(issue_content, issue_type="enhance")
success = generator.process_enhance(issue_file, output_format="full")
if success:
progress.update(task_id, completed=1, description="增强处理完成") # 修改:成功时更新完成状态
else:
@ -213,7 +216,7 @@ def fix(
console=console
) as progress:
task_id = progress.add_task("正在修复项目...", total=1) # 修改设置总任务数为1以控制进度显示
generator = CodeGenerator(
generator = FixGenerator(
api_key=api_key,
base_url=base_url,
model=model,
@ -221,7 +224,7 @@ def fix(
log_file=log_file_path,
max_concurrency=max_concurrency,
)
success = generator.process_issue(issue_content, issue_type="fix")
success = generator.process_fix(issue_file, output_format="full")
if success:
progress.update(task_id, completed=1, description="修复处理完成") # 修改:成功时更新完成状态
else:
@ -285,7 +288,7 @@ def design(
console=console,
) as progress:
task_id = progress.add_task("正在生成design.json...", total=1) # 可选:保持现有风格,但工单未要求修改此命令
generator = CodeGenerator(
generator = BaseGenerator(
api_key=api_key,
base_url=base_url,
model=model,
@ -330,7 +333,7 @@ def check(
log_file_path = init_logging(output_dir, log_file, command_name="check")
try:
generator = CodeGenerator(
generator = BaseGenerator(
api_key=api_key,
base_url=base_url,
model=model,

View File

@ -19,8 +19,8 @@ from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 Fil
from .diff_applier import parse_diff, apply_diff
class CodeGenerator:
"""代码生成器,封装所有逻辑,支持设计层、断点续写和命令执行"""
class BaseGenerator:
"""代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行"""
def __init__(
self,
@ -80,8 +80,6 @@ class CodeGenerator:
调用LLM并返回解析后的JSON
"""
logger.debug(f"调用LLM模型: {self.model}")
logger.debug(f"System: {system_prompt[:200]}...")
logger.debug(f"User: {user_prompt[:200]}...")
try:
response = self.client.chat.completions.create(
@ -577,370 +575,6 @@ class CodeGenerator:
self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]")
return False
def run(self, readme_path: Path):
"""
主执行流程支持基于依赖关系的并发生成
"""
logger.info("=" * 50)
logger.info("开始代码生成流程")
logger.info(f"README: {readme_path}")
logger.info(f"输出目录: {self.output_dir}")
# 解析README
self.console.print("[bold yellow]🔍 正在解析README...[/bold yellow]")
try:
self.readme_content = self.parse_readme(readme_path)
except Exception as e:
logger.error(f"解析README失败无法继续: {e}")
self.console.print(f"[bold red]❌ 解析README失败无法继续: {e}[/bold red]")
return # 致命错误,退出
# 加载状态
state = self.load_state()
if state:
self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]")
self.state = state
# 从状态恢复设计假设design.json已存在
design_path = self.output_dir / "design.json"
if design_path.exists():
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
self.console.print("[bold yellow]⚠ design.json损坏重新生成...[/bold yellow]")
try:
self.design = self.generate_design_json()
except Exception as e2:
logger.error(f"重新生成design.json失败: {e2}")
self.console.print(f"[bold red]❌ 重新生成design.json失败: {e2}[/bold red]")
return
else:
self.console.print("[bold yellow]⚠ design.json不存在重新生成...[/bold yellow]")
try:
self.design = self.generate_design_json()
except Exception as e:
logger.error(f"生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]")
return
else:
self.console.print("[bold yellow]📋 正在生成设计文件...[/bold yellow]")
try:
self.design = self.generate_design_json()
self.state = None
except Exception as e:
logger.error(f"生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]")
return
# 获取项目结构
self.console.print("[bold yellow]📋 正在分析项目结构...[/bold yellow]")
try:
files, dependencies = self.get_project_structure()
except Exception as e:
logger.error(f"获取项目结构失败: {e}")
self.console.print(f"[bold red]❌ 获取项目结构失败: {e}[/bold red]")
return
self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]")
# 添加隐式依赖
# dependencies = self._add_implicit_dependencies(files, dependencies)
# logger.info("已添加隐式依赖")
# 拓扑排序检查依赖关系
try:
sorted_files = self._topological_sort(files, dependencies)
logger.info(f"拓扑排序成功,文件顺序: {sorted_files}")
except ValueError as e:
logger.error(f"依赖关系错误: {e}")
self.console.print(f"[bold red]❌ 依赖关系错误: {e}[/bold red]")
return # 退出生成
# 断点续写:确定已生成文件
generated_files_set = set(self.state.generated_files if self.state else [])
# 构建DAG并计算入度
in_degree = {file: len(dependencies.get(file, [])) for file in files}
# 初始化队列为入度为0且未生成的节点
queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set])
processed_files = set(generated_files_set) # 跟踪已处理文件
remaining_files = set(files) - processed_files
# 创建进度条
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=self.console,
) as progress:
self.progress = progress
total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files))
progress.update(total_task, completed=len(processed_files) - len(generated_files_set))
# 初始化文件任务映射
file_tasks = {} # 局部字典映射文件到任务ID
# 并发任务调度
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = {}
while queue or futures:
# 提交队列中的任务
while queue:
file = queue.popleft()
future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files)
futures[future] = file
# 为每个文件添加独立进度任务并保存任务ID添加状态显示
task_id = progress.add_task(f"{file} - {FileStatus.GENERATING}", total=1) # 修改:添加状态
file_tasks[file] = task_id
# 等待任意任务完成
done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0)
for future in done:
file = futures.pop(future)
try:
success, error_msg = future.result()
# 更新文件进度任务,根据状态更新描述
if file in file_tasks:
if success:
progress.update(file_tasks[file], completed=1, description=f"{file} - {FileStatus.SUCCESS}") # 修改:添加状态
progress.remove_task(file_tasks[file]) # 移除任务
else:
# 如果失败,标记为错误状态
progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态
progress.remove_task(file_tasks[file])
del file_tasks[file] # 清理映射
if success:
processed_files.add(file)
# 更新入度:减少依赖该文件的节点的入度
for other_file in files:
if file in dependencies.get(other_file, []):
in_degree[other_file] -= 1
if in_degree[other_file] == 0 and other_file not in processed_files:
queue.append(other_file)
# 保存状态
self.save_state(list(processed_files), dependencies)
progress.update(total_task, advance=1) # 更新整体进度
else:
logger.error(f"文件 {file} 生成失败,错误: {error_msg}")
self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]")
# 错误处理:继续处理其他文件,但记录失败
except Exception as e:
# 捕获 Future 中存储的异常
logger.error(f"任务 {file} 执行时发生异常: {e}")
self.console.print(f"[bold red]❌ 任务 {file} 执行时发生异常: {e}[/bold red]")
# 将其视为失败
success = False
error_msg = str(e)
# 然后执行和上面 `else` 分支相同的失败处理逻辑
if file in file_tasks:
progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态
progress.remove_task(file_tasks[file])
del file_tasks[file] # 清理映射
logger.error(f"文件 {file} 生成失败,错误: {error_msg}")
self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]")
# 错误处理:继续处理其他文件,但记录失败
logger.success("所有文件处理完成!")
# 清理状态文件
if self.state_file.exists():
try:
self.state_file.unlink()
logger.info("状态文件已清理")
except Exception as e:
logger.error(f"清理状态文件失败: {e}")
self.console.print(f"[bold red]❌ 清理状态文件失败: {e}[/bold red]")
def process_issue(self, issue_content: str, issue_type: str) -> bool:
"""
处理需求增强或 Bug 修复工单
Args:
issue_content: 工单文件内容文本
issue_type: 'enhance' 'fix'
Returns:
bool: 处理是否成功
"""
logger.info(f"开始处理 {issue_type} 工单")
self.console.print(f"[bold yellow]📋 正在分析 {issue_type} 工单...[/bold yellow]")
# 加载现有 design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。")
self.console.print(f"[bold red]❌ design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。[/bold red]")
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 加载 README 内容(如果存在)
readme_path = self.output_dir / "README.md"
if readme_path.exists():
try:
with open(readme_path, "r", encoding="utf-8") as f:
self.readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
self.readme_content = ""
else:
self.readme_content = ""
# 步骤1: 分析工单,生成变更计划
try:
change_plan = self._analyze_issue(issue_content, issue_type)
except Exception as e:
logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
return False
if not change_plan:
logger.error("无法生成变更计划")
self.console.print("[bold red]❌ 无法生成变更计划[/bold red]")
return False
affected_files = change_plan.get("affected_files", [])
if not affected_files:
logger.warning("工单分析结果未指定任何受影响文件")
self.console.print("[yellow]⚠ 工单分析结果未指定任何受影响文件[/yellow]")
return True # 无变更
self.console.print(f"[green]✅ 分析完成,将处理 {len(affected_files)} 个文件[/green]")
# 添加依赖关系排序:解析 design.json 中的依赖,确保依赖项先于被依赖项处理
# 构建依赖关系字典用于拓扑排序
dependencies_dict = {}
for file_info in affected_files:
path = file_info["path"]
# 从 design.json 中获取依赖关系
deps = []
for f in self.design.files:
if f.path == path:
deps = f.dependencies
break
# 只考虑在 affected_files 中的依赖文件,以确保内部依赖顺序
affected_paths_set = set(info["path"] for info in affected_files)
filtered_deps = [dep for dep in deps if dep in affected_paths_set]
dependencies_dict[path] = filtered_deps
# 对 affected_files 进行拓扑排序
try:
sorted_paths = self._topological_sort([info["path"] for info in affected_files], dependencies_dict)
except ValueError as e:
logger.error(f"依赖关系排序失败: {e}")
self.console.print(f"[bold red]❌ 依赖关系排序失败: {e}[/bold red]")
return False # 排序失败,处理中止
# 重新排序 affected_files 基于 sorted_paths
file_info_map = {info["path"]: info for info in affected_files}
sorted_affected_files = [file_info_map[path] for path in sorted_paths]
# 步骤2: 逐个处理文件(按依赖顺序)
generated_files = []
for file_info in sorted_affected_files:
file_path = file_info["path"]
action = file_info.get("action", "modify") # modify 或 create
description = file_info.get("description", "")
dependencies = file_info.get("dependencies", [])
logger.info(f"处理文件: {file_path} (操作: {action})")
# 读取现有内容(如果是修改)
existing = None
full_path = self.output_dir / file_path
if action == "modify" and full_path.exists():
try:
with open(full_path, "r", encoding="utf-8") as f:
existing = f.read()
except Exception as e:
logger.error(f"读取文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]")
existing = None # 如果读取失败,按新文件处理
elif action == "create" and full_path.exists():
logger.warning(f"文件 {file_path} 已存在,将覆盖")
self.console.print(f"[yellow]⚠ 文件 {file_path} 已存在,将覆盖[/yellow]")
existing = None # 创建模式,即使存在也按新文件处理
# 收集实际存在的依赖文件
dep_paths = []
missing_deps = []
for dep in dependencies:
dep_full = self.output_dir / dep
if dep_full.exists():
dep_paths.append(dep)
else:
missing_deps.append(dep)
if missing_deps:
logger.warning(f"依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}")
self.console.print(f"[yellow]⚠ 依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}[/yellow]")
# 构建生成指令
instruction = f"请根据工单描述{'修改' if action == 'modify' else '生成'}文件 '{file_path}'.\n"
instruction += f"工单内容摘要:{description}\n"
if action == "modify":
instruction += "请在现有代码基础上进行修改,保持原有风格和功能不变。"
else:
instruction += "请生成完整的代码文件。"
# 调用 generate_file
code, desc, commands = self.generate_file(
file_path,
instruction,
dep_paths,
existing_content=existing,
output_format="full",
)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
full_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(full_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {full_path}")
generated_files.append(file_path)
except Exception as e:
logger.error(f"写入文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]")
# 跳过命令执行
commands = []
# 执行关联命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
# 步骤3: 更新 design.json
if generated_files:
"""
try:
self._update_design(generated_files, change_plan.get("design_updates", {}))
self.console.print("[green]✅ design.json 已更新[/green]")
except Exception as e:
logger.error(f"更新design.json失败: {e}")
self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]")
"""
logger.info(f'change_plan: {change_plan}')
self._update_design(generated_files, change_plan.get("design_updates", {}))
self.console.print("[green]✅ design.json 已更新[/green]")
self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]")
return True
def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]:
"""
调用 LLM 分析工单返回结构化变更计划

View File

@ -0,0 +1,180 @@
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
from loguru import logger
from rich.console import Console
from .core import BaseGenerator
class EnhanceGenerator(BaseGenerator):
"""
增强生成器类继承自 BaseGenerator专门处理 enhance 命令逻辑
用于根据需求工单feature.issue对现有项目进行功能增强
"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4
):
"""
初始化 EnhanceGenerator继承 BaseGenerator 的配置
Args:
api_key: OpenAI API密钥默认从环境变量 DEEPSEEK_APIKEY 读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径默认自动生成
max_concurrency: 最大并发数
"""
super().__init__(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=output_dir,
log_file=log_file,
max_concurrency=max_concurrency
)
logger.info("EnhanceGenerator 初始化完成")
def process_enhance(self, issue_file_path: Path, output_format: str = "full") -> bool:
"""
处理 enhance 命令的核心逻辑读取工单分析变更生成或修改文件更新设计
Args:
issue_file_path: 需求工单文件路径 feature.issue
output_format: 输出格式'full' 'diff'默认为 'full'
Returns:
bool: 处理是否成功
"""
logger.info(f"开始处理增强工单: {issue_file_path}")
self.console.print(f"[bold blue]🔧 处理增强工单: {issue_file_path}[/bold blue]")
# 1. 读取工单文件内容
try:
with open(issue_file_path, 'r', encoding='utf-8') as f:
issue_content = f.read()
logger.debug(f"工单内容长度: {len(issue_content)} 字符")
except Exception as e:
logger.error(f"读取工单文件失败: {e}")
self.console.print(f"[bold red]❌ 读取工单文件失败: {e}[/bold red]")
return False
# 2. 调用 LLM 分析工单,获取变更计划
try:
analysis_result = self._analyze_issue(issue_content, "feature")
except Exception as e:
logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
return False
affected_files = analysis_result.get("affected_files", [])
design_updates = analysis_result.get("design_updates", {})
if not affected_files:
logger.warning("工单分析未发现需要变更的文件")
self.console.print("[yellow]⚠ 工单分析未发现需要变更的文件[/yellow]")
return True # 无变更,视为成功
logger.info(f"分析到 {len(affected_files)} 个受影响文件")
self.console.print(f"[green]📋 分析到 {len(affected_files)} 个受影响文件[/green]")
# 3. 对受影响文件进行拓扑排序,基于依赖关系
file_paths = [af["path"] for af in affected_files]
dependencies = {af["path"]: af.get("dependencies", []) for af in affected_files}
try:
sorted_paths = self._topological_sort(file_paths, dependencies)
logger.debug(f"拓扑排序结果: {sorted_paths}")
except ValueError as e:
logger.error(f"拓扑排序失败,检测到循环依赖: {e}")
self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]")
return False
# 根据排序顺序获取文件信息
sorted_file_infos = []
for path in sorted_paths:
for af in affected_files:
if af["path"] == path:
sorted_file_infos.append(af)
break
generated_files = []
for file_info in sorted_file_infos:
file_path = file_info["path"]
action = file_info.get("action", "modify")
description = file_info.get("description", "")
deps = file_info.get("dependencies", [])
# 4. 根据 action 处理文件:读取现有内容或创建新文件
existing_content = None
if action == "modify":
full_path = self.output_dir / file_path
if full_path.exists():
try:
with open(full_path, 'r', encoding='utf-8') as f:
existing_content = f.read()
logger.debug(f"读取现有文件内容: {file_path}")
except Exception as e:
logger.error(f"读取现有文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 读取现有文件 {file_path} 失败: {e}[/bold red]")
# 容错处理,视为创建新文件
existing_content = ""
action = "create"
else:
logger.warning(f"文件 {file_path} 不存在,将创建新文件")
self.console.print(f"[yellow]⚠ 文件 {file_path} 不存在,将创建新文件[/yellow]")
action = "create"
# 5. 调用 generate_file 生成或修改文件
instruction = (
f"根据需求工单 '{issue_file_path.name}' 和变更描述 '{description}'"
f"{action} 文件 '{file_path}'。请确保符合项目设计。"
)
try:
code, desc, commands = self.generate_file(
file_path=file_path,
prompt_instruction=instruction,
dependency_files=deps,
existing_content=existing_content,
output_format=output_format
)
# generate_file 内部已写入文件并执行命令
generated_files.append(file_path)
logger.info(f"文件处理完成: {file_path} - {desc}")
self.console.print(f"[green]✅ 文件处理完成: {file_path} - {desc}[/green]")
except Exception as e:
logger.error(f"生成文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 生成文件 {file_path} 失败: {e}[/bold red]")
# 继续处理其他文件,但记录失败
continue
# 6. 更新 design.json 以反映变更
if generated_files:
try:
self._update_design(generated_files, design_updates)
logger.info(f"已更新 design.json包含 {len(generated_files)} 个文件变更")
self.console.print(f"[green]✅ 已更新 design.json[/green]")
except Exception as e:
logger.error(f"更新 design.json 失败: {e}")
self.console.print(f"[bold red]❌ 更新 design.json 失败: {e}[/bold red]")
# 不返回 False因为文件已生成仅记录错误
# 7. 可选:执行全局命令或检查(如有需要,可从 design.json 读取)
# 此处可根据设计添加,例如运行检查工具
if self.design and self.design.commands:
logger.info("开始执行项目命令")
for cmd in self.design.commands:
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续: {cmd}")
logger.info("增强处理流程完成")
self.console.print("[bold green]🎉 增强处理流程完成[/bold green]")
return True

View File

@ -0,0 +1,146 @@
import json
from pathlib import Path
from typing import List, Optional
from .core import BaseGenerator
from .models import OutputFormat
class FixGenerator(BaseGenerator):
"""处理 Bug 修复逻辑的生成器类,继承自 BaseGenerator。"""
def __init__(self, **kwargs):
"""初始化 FixGenerator继承基类参数。"""
super().__init__(**kwargs)
def process_fix(
self,
bug_issue_path: Path,
output_format: OutputFormat = OutputFormat.FULL,
) -> bool:
"""
处理 fix 命令逻辑读取 Bug 工单分析变更生成并应用修复代码
Args:
bug_issue_path: Bug 工单文件路径 bug.issue
output_format: 输出格式默认为 FULL支持 DIFF 用于差异生成
Returns:
bool: 修复是否成功
"""
# 读取 Bug 工单文件内容
try:
with open(bug_issue_path, "r", encoding="utf-8") as f:
issue_content = f.read()
except Exception as e:
self.logger.error(f"读取 Bug 工单文件失败: {e}")
self.console.print(f"[bold red]❌ 读取 Bug 工单文件失败: {e}[/bold red]")
return False
# 调用 LLM 分析工单,获取变更计划
try:
analysis = self._analyze_issue(issue_content, "bug")
except Exception as e:
self.logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
return False
affected_files = analysis.get("affected_files", [])
design_updates = analysis.get("design_updates", {})
successful_files = []
for file_info in affected_files:
file_path = file_info.get("path")
action = file_info.get("action") # 'create' 或 'modify'
description = file_info.get("description", "")
dependencies = file_info.get("dependencies", [])
if action == "modify":
# 修改现有文件:读取当前内容
full_path = self.output_dir / file_path
if not full_path.exists():
self.logger.error(f"文件不存在,无法修改: {file_path}")
self.console.print(f"[bold red]❌ 文件不存在,无法修改: {file_path}[/bold red]")
continue
try:
with open(full_path, "r", encoding="utf-8") as f:
existing_content = f.read()
except Exception as e:
self.logger.error(f"读取文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]")
continue
# 生成修复代码
instruction = (
f"根据 Bug 工单修复文件 '{file_path}'。工单摘要: {issue_content[:200]}..."
)
code, desc, commands = self.generate_file(
file_path=file_path,
prompt_instruction=instruction,
dependency_files=dependencies,
existing_content=existing_content,
output_format=output_format,
)
# 写入修复后的内容
try:
with open(full_path, "w", encoding="utf-8") as f:
f.write(code)
self.logger.info(f"已修复文件: {file_path} - {desc}")
successful_files.append(file_path)
except Exception as e:
self.logger.error(f"写入文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]")
continue
# 执行相关命令
for cmd in commands:
self.execute_command(cmd, cwd=self.output_dir)
# 更新 design.json 中的文件条目
self.update_file_entry(file_path, code)
elif action == "create":
# 创建新文件:无需现有内容
instruction = (
f"根据 Bug 工单创建文件 '{file_path}'。工单摘要: {issue_content[:200]}..."
)
code, desc, commands = self.generate_file(
file_path=file_path,
prompt_instruction=instruction,
dependency_files=dependencies,
existing_content=None,
output_format=output_format,
)
# 写入新文件
full_path = self.output_dir / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(full_path, "w", encoding="utf-8") as f:
f.write(code)
self.logger.info(f"已创建文件: {file_path} - {desc}")
successful_files.append(file_path)
except Exception as e:
self.logger.error(f"创建文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 创建文件 {file_path} 失败: {e}[/bold red]")
continue
for cmd in commands:
self.execute_command(cmd, cwd=self.output_dir)
# 添加新条目到 design.json
self._update_design([file_path], design_updates)
# 如果有设计更新,整体更新 design.json
if design_updates and successful_files:
self._update_design(successful_files, design_updates)
if successful_files:
self.logger.info(f"修复成功,处理了 {len(successful_files)} 个文件")
self.console.print(f"[green]✅ 修复成功,处理了 {len(successful_files)} 个文件[/green]")
return True
else:
self.logger.error("修复失败,没有文件被成功处理")
self.console.print("[bold red]❌ 修复失败,没有文件被成功处理[/bold red]")
return False

View File

@ -0,0 +1,108 @@
import json
from pathlib import Path
from typing import Optional
from .core import BaseGenerator
from loguru import logger # 确保日志可用
class InitGenerator(BaseGenerator):
"""处理 init 命令的生成器类,继承自 BaseGenerator用于从 README 初始化项目。"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4
):
"""初始化 InitGenerator。
Args:
api_key: OpenAI API密钥默认从环境变量 DEEPSEEK_APIKEY 读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径
max_concurrency: 最大并发数
"""
super().__init__(api_key, base_url, model, output_dir, log_file, max_concurrency)
def run(self, readme_path: Path) -> None:
"""处理 init 命令逻辑:根据 README.md 初始化项目。
Args:
readme_path: README 文件路径
"""
logger.info(f"开始初始化项目README路径: {readme_path}")
self.console.print(f"[bold]🚀 开始初始化项目...[/bold]")
# 1. 读取 README
self.readme_content = self.parse_readme(readme_path)
logger.info("README读取完成")
# 2. 生成 design.json
self.design = self.generate_design_json()
logger.info("design.json生成完成")
# 3. 获取文件列表和依赖
files, dependencies = self.get_project_structure()
logger.info(f"获取到 {len(files)} 个待生成文件")
# 4. 拓扑排序以确保依赖顺序
try:
sorted_files = self._topological_sort(files, dependencies)
logger.info(f"拓扑排序完成,文件顺序: {sorted_files}")
except ValueError as e:
logger.error(f"拓扑排序失败: {e}")
self.console.print(f"[bold red]❌ 拓扑排序失败: {e}[/bold red]")
return
# 5. 生成每个文件
generated_files = set()
for i, file_path in enumerate(sorted_files, 1):
logger.info(f"正在生成文件 {i}/{len(sorted_files)}: {file_path}")
self.console.print(f"[cyan]生成文件 {i}/{len(sorted_files)}: {file_path}[/cyan]")
# 准备指令
instruction = f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。"
# 收集已生成的依赖文件
available_deps = [dep for dep in dependencies.get(file_path, []) if dep in generated_files]
# 生成文件
code, desc, commands = self.generate_file(
file_path, instruction, available_deps, output_format="full"
)
logger.info(f"文件生成完成: {file_path} - {desc}")
# 写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"文件已写入: {output_path}")
generated_files.add(file_path)
# 执行关联的命令
for cmd in commands:
logger.info(f"执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
self.console.print(f"[yellow]⚠ 命令执行失败: {cmd}[/yellow]")
# 6. 保存状态
self.save_state(list(generated_files), dependencies)
logger.info("状态已保存")
# 7. 可选:执行项目级命令,如安装依赖和运行测试
if self.design.commands:
logger.info("执行项目级命令")
for cmd in self.design.commands:
logger.info(f"执行项目命令: {cmd}")
self.execute_command(cmd, cwd=self.output_dir)
self.console.print("[green]✅ 项目初始化完成![/green]")
logger.info("项目初始化流程完成")