llmcodegen/issues/refactor-split-core-generat...

1250 lines
59 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 需求工单: 拆分 core.py 为多个子类文件
name: 重构 core.py拆分为基类和子命令专用类
description: |
当前 `core.py` 中的 `CodeGenerator` 类承担了所有子命令init、enhance、fix的实现逻辑导致文件过大、职责不清晰。
为了提高代码可维护性和可扩展性,需要将 `CodeGenerator` 重构为一个基类(例如 `BaseGenerator`),包含公共方法(如 `_call_llm`、`_topological_sort`、`execute_command` 等),
然后为每个子命令创建独立的子类,分别放在单独的文件中:
- `InitGenerator`(文件:`init_generator.py`):处理 `init` 命令的逻辑(原 `run` 方法)。
- `EnhanceGenerator`(文件:`enhance_generator.py`):处理 `enhance` 命令的逻辑(原 `process_issue` 方法中与 enhance 相关的部分)。
- `FixGenerator`(文件:`fix_generator.py`):处理 `fix` 命令的逻辑(原 `process_issue` 方法中与 fix 相关的部分)。
同时,保留 `core.py` 作为基类文件(或更名为 `base_generator.py`),原有 `CodeGenerator` 类改为 `BaseGenerator`,并将公共方法保留在其中。
命令行接口(`cli.py`)中对应命令的实例化部分需要改为使用对应的子类。
注意:`process_issue` 方法目前同时被 `enhance` 和 `fix` 使用,可以根据 `issue_type` 参数决定行为,拆分后应在两个子类中分别实现各自逻辑,避免重复。
affected_files:
- src/llm_codegen/core.py
- src/llm_codegen/cli.py
- src/llm_codegen/init_generator.py # 新建
- src/llm_codegen/enhance_generator.py # 新建
- src/llm_codegen/fix_generator.py # 新建
acceptance_criteria:
- 基类 `BaseGenerator` 包含以下公共方法:`__init__`、`_call_llm`、`_topological_sort`、`execute_command`、`_apply_diff`、`generate_file`、`parse_readme`、`generate_design_json`、`load_state`、`save_state`、`get_project_structure`、`_add_implicit_dependencies`、`_generate_file_task`(这些方法在现有 CodeGenerator 中属于通用功能)。
- `InitGenerator` 继承自 `BaseGenerator`,包含 `run` 方法(原 `CodeGenerator.run` 逻辑),并可能根据需要进行调整。
- `EnhanceGenerator` 继承自 `BaseGenerator`,包含 `process_enhance` 方法(或直接实现 `process_issue` 但仅处理 enhance 类型),以及可能需要的辅助方法。
- `FixGenerator` 继承自 `BaseGenerator`,包含 `process_fix` 方法(处理 fix 类型)。
- 所有新建文件符合项目代码风格,包含适当的类型注解和文档字符串。
- `cli.py` 中对应命令init、enhance、fix分别实例化对应的子类并调用相应方法保持原有命令行行为不变。
- 原有的 `process_issue` 方法(同时处理 enhance/fix应被移除确保职责分离。
- 项目能够正常通过所有现有测试,且功能与原版一致。
- 代码检查工具pylint、mypy、black无新增错误或警告。
> 初始core.py实现
```python
import json
import os
import subprocess
import sys
import concurrent.futures
import pendulum
from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path
from collections import deque
import threading
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID
from loguru import logger
from openai import OpenAI
from .utils import is_dangerous_command
from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 FileStatus 导入
from .diff_applier import parse_diff, apply_diff
class CodeGenerator:
"""代码生成器,封装所有逻辑,支持设计层、断点续写和命令执行"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4
):
"""
初始化生成器
Args:
api_key: OpenAI API密钥默认从环境变量DEEPSEEK_APIKEY读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径,默认自动生成
"""
self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY")
if not self.api_key:
raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY")
self.client = OpenAI(api_key=self.api_key, base_url=base_url)
self.model = model
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.output_dir / ".llm_generator_state.json"
self.console = Console() # 添加console实例用于rich打印
self._state_lock = threading.Lock()
self.max_concurrency = max_concurrency
# 配置日志
if log_file is None:
log_file = self.output_dir / "generator.log"
logger.remove() # 移除默认handler
logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上
logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG
logger.info(f"日志已初始化,保存至: {log_file}")
self.readme_content = None
self.design: Optional[DesignModel] = None
self.state: Optional[StateModel] = None
self.progress: Optional[Progress] = None
self.tasks: Dict[str, TaskID] = {} # 任务ID映射
def _call_llm(
self,
system_prompt: str,
user_prompt: str,
temperature: float = 0.2,
expect_json: bool = True,
) -> Dict[str, Any]:
"""
调用LLM并返回解析后的JSON
"""
logger.debug(f"调用LLM模型: {self.model}")
logger.debug(f"System: {system_prompt[:200]}...")
logger.debug(f"User: {user_prompt[:200]}...")
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=temperature,
response_format={"type": "json_object"} if expect_json else None,
)
message = response.choices[0].message
content = message.content
# 记录思考过程(如果存在)
reasoning_content = None
if hasattr(message, "reasoning_content") and message.reasoning_content:
reasoning_content = message.reasoning_content
logger.info("模型思考过程已记录")
# 创建响应目录
responses_dir = self.output_dir / "llm_responses"
responses_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名(使用当前时间)
timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS")
response_file = responses_dir / f"response_{timestamp}.json"
# 保存响应到JSON文件
response_data = {
"timestamp": timestamp,
"model": self.model,
"content": content,
"reasoning_content": reasoning_content,
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"temperature": temperature,
"expect_json": expect_json
}
with open(response_file, "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM原始响应: {response_file.name}")
if expect_json:
result = json.loads(content)
else:
result = {"content": content}
return result
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]")
raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}")
except Exception as e:
logger.error(f"LLM调用失败: {e}")
self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]")
raise
def parse_readme(self, readme_path: Path) -> str:
"""
读取README文件内容
"""
logger.info(f"读取README文件: {readme_path}")
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"README内容长度: {len(content)} 字符")
return content
except Exception as e:
logger.error(f"读取README失败: {e}")
self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]")
raise
def generate_design_json(self) -> DesignModel:
"""
调用LLM生成design.json内容并解析为DesignModel
"""
system_prompt = (
"你是一个软件架构师。请根据README描述生成项目的中间设计文件design.json。"
"design.json应包含项目名称、版本、描述、文件列表含路径、摘要、依赖、函数和类、建议命令和检查工具。"
"返回严格的 JSON 对象符合DesignModel结构。"
)
user_prompt = f"README内容如下\n\n{self.readme_content}"
result = self._call_llm(system_prompt, user_prompt)
design_data = result
design = DesignModel(**design_data)
# 写入design.json文件
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"已生成design.json: {design_path}")
return design
def load_state(self) -> Optional[StateModel]:
"""加载断点续写状态"""
if self.state_file.exists():
try:
with open(self.state_file, "r", encoding="utf-8") as f:
state_data = json.load(f)
self.state = StateModel(**state_data)
logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个")
return self.state
except Exception as e:
logger.error(f"加载状态失败: {e}")
self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]")
return None
return None
def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None:
"""保存断点续写状态,适应并发生成(线程安全)"""
with self._state_lock: # 串行化写入
state = StateModel(
current_file_index=0,
generated_files=generated_files,
dependencies_map=dependencies_map,
total_files=len(self.design.files) if self.design else 0,
output_dir=str(self.output_dir),
readme_path=self.readme_content[:100] if self.readme_content else ""
)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(state.model_dump(), f, indent=2, ensure_ascii=False)
logger.debug(f"状态已保存: {self.state_file}")
def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]:
"""
从design.json获取文件列表和依赖关系
Returns:
(files, dependencies)
files: 按顺序需要生成的文件路径列表
dependencies: 字典 {file: [依赖文件路径]}
"""
if not self.design:
raise ValueError("design.json未加载请先调用generate_design_json")
files = [file.path for file in self.design.files]
dependencies = {file.path: file.dependencies for file in self.design.files}
logger.info(f"从design.json解析到 {len(files)} 个待生成文件")
logger.debug(f"文件列表: {files}")
logger.debug(f"依赖关系: {dependencies}")
return files, dependencies
def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""
添加隐式依赖关系,基于文件路径和常见模式
Args:
files: 文件路径列表
dependencies: 原始依赖字典
Returns:
Dict[str, List[str]]: 增强后的依赖字典
"""
enhanced = dependencies.copy()
for file in files:
if file not in enhanced:
enhanced[file] = []
# 添加同一目录下的其他文件作为隐式依赖(简单示例)
path = Path(file)
implicit_deps = [
f for f in files
if f != file and Path(f).parent == path.parent and f not in enhanced[file]
]
if implicit_deps:
enhanced[file].extend(implicit_deps)
logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}")
return enhanced
def _apply_diff(self, diff: str, original_content: str) -> str:
"""
应用 unified diff 到原始内容,返回修改后的内容。
Args:
diff: 字符串形式的 unified diff
original_content: 原始文件内容
Returns:
str: 应用 diff 后的内容
Raises:
Exception: 如果应用 diff 失败
"""
try:
# 解析 diff 行
diff_lines = diff.splitlines(keepends=True)
if not diff_lines:
raise ValueError("diff 为空")
# 简单的 diff 应用逻辑:假设 diff 是标准 unified diff逐行处理
# 注意:这是一个简化实现,对于复杂 diff 可能不准确,建议使用专用库如 `patch`
original_lines = original_content.splitlines(keepends=True)
result_lines = []
i = 0
j = 0
while i < len(diff_lines):
line = diff_lines[i]
if line.startswith('--- ') or line.startswith('+++ '):
i += 1
continue
elif line.startswith('@@ '):
i += 1
continue
elif line.startswith(' '):
# 未修改行
if j < len(original_lines):
result_lines.append(original_lines[j])
j += 1
i += 1
elif line.startswith('-'):
# 删除行
j += 1
i += 1
elif line.startswith('+'):
# 新增行
result_lines.append(line[1:])
i += 1
else:
i += 1 # 跳过未知行
# 添加剩余原始行
while j < len(original_lines):
result_lines.append(original_lines[j])
j += 1
return ''.join(result_lines)
except Exception as e:
logger.error(f"应用 diff 时出错: {e}")
raise RuntimeError(f"无法应用 diff: {e}")
def generate_file(
self,
file_path: str,
prompt_instruction: str,
dependency_files: List[str],
existing_content: Optional[str] = None,
output_format: str = "full", # 新增参数,默认 'full'
) -> Tuple[str, str, List[str]]:
"""
生成单个文件,返回 (代码, 描述, 命令列表)
Args:
file_path: 目标文件路径
prompt_instruction: 生成指令
dependency_files: 依赖文件列表(用于上下文)
existing_content: 文件现有内容(若为修改模式)
output_format: 输出格式,'full' 或 'diff',来自 models.py
"""
# 收集上下文内容
context_content = []
if self.readme_content:
context_content.append(f"### 项目 README ###\n{self.readme_content}\n")
# 添加 design.json 上下文
design_path = self.output_dir / "design.json"
if design_path.exists():
try:
with open(design_path, "r", encoding="utf-8") as f:
design_content = f.read()
context_content.append(f"### 设计文件: design.json ###\n{design_content}\n")
except Exception as e:
logger.error(f"读取design.json失败: {e}")
self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]")
# 如果design.json读取失败可能无法继续但保持上下文为空或部分
# 添加依赖文件内容(仅读取存在的文件)
for dep in dependency_files:
dep_path = Path(dep)
if not dep_path.exists():
alt_path = self.output_dir / dep
if alt_path.exists():
dep_path = alt_path
else:
logger.warning(f"依赖文件不存在,已跳过: {dep}")
self.console.print(f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]")
continue
try:
with open(dep_path, "r", encoding="utf-8") as f:
content = f.read()
context_content.append(f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n")
except Exception as e:
logger.error(f"读取依赖文件 {dep} 失败: {e}")
self.console.print(f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]")
# 跳过此依赖文件
# 如果有现有内容,也加入上下文
if existing_content is not None:
context_content.append(f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n")
full_context = "\n".join(context_content)
# 根据 output_format 设置 system_prompt
if output_format == "diff":
if existing_content is None:
logger.error("对于 output_format='diff',必须提供 existing_content")
self.console.print("[bold red]❌ 对于 output_format='diff',必须提供 existing_content[/bold red]")
return "# 错误:缺少现有内容", "生成失败,缺少现有内容", []
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件生成文件的差异diff。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- diff: (string) 文件的差异,使用 unified diff 格式\n"
"- description: (string) 简短的中文修改描述\n"
"- commands: (array of string) 修改此文件后需要执行的操作系统命令列表,若无则返回空数组\n"
"- output_format: (string) 应为 'diff'"
)
else:
# output_format 为 'full' 或其他,保持现有逻辑
if existing_content is not None:
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- code: (string) 修改后的完整代码\n"
"- description: (string) 简短的中文修改描述\n"
"- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'full'"
)
else:
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- code: (string) 生成的完整代码\n"
"- description: (string) 简短的中文功能描述\n"
"- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'full'"
)
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
if output_format == "diff":
user_prompt += f"\noutput_format: {output_format}"
try:
result = self._call_llm(system_prompt, user_prompt)
# 解析响应,假设包含 output_format 字段
if output_format == "diff":
diff = result.get("diff")
description = result.get("description", "")
commands = result.get("commands", [])
result.get("output_format", "diff")
if diff is None:
raise ValueError("LLM 响应中没有 diff 字段")
# 调用 diff_applier 应用 diff
try:
chunks = parse_diff(diff)
code, conflicts = apply_diff(existing_content, chunks)
if conflicts:
logger.warning(f"应用diff时发现冲突: {conflicts}")
# 可以记录冲突,但继续处理
except Exception as e:
logger.error(f"应用 diff 时发生意外错误: {e}")
self.console.print(f"[bold red]❌ 应用 diff 时发生意外错误: {e}[/bold red]")
return "# 应用 diff 失败", f"应用 diff 时发生意外错误: {e}", []
return code, description, commands
else:
code = result.get("code")
description = result.get("description", "")
commands = result.get("commands", [])
result.get("output_format", "full")
if code is None:
raise ValueError("LLM 响应中没有 code 字段")
return code, description, commands
except Exception as e:
logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}")
self.console.print(f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]")
# 返回默认值以便继续
return "# 生成失败,请检查日志", "生成失败,发生错误", []
def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]:
"""
并发任务函数,用于生成单个文件
Args:
file_path: 文件路径
dependencies: 依赖文件列表
generated_files: 已生成文件的集合(用于上下文)
Returns:
Tuple[bool, str]: (是否成功, 错误信息或空字符串)
"""
try:
instruction = f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。"
# 过滤依赖文件,只使用已生成的
available_deps = [dep for dep in dependencies if dep in generated_files]
code, desc, commands = self.generate_file(file_path, instruction, available_deps)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {output_path}")
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
return True, ""
except Exception as e:
logger.error(f"生成文件 {file_path} 失败: {e}")
return False, str(e)
def _topological_sort(self, files: List[str], dependencies: Dict[str, List[str]]) -> List[str]:
"""
对文件列表进行拓扑排序,基于依赖关系。
返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。
如果检测到循环依赖抛出ValueError。
"""
from collections import deque
# 初始化入度和反向邻接表
in_degree = {f: 0 for f in files}
rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f
# 构建图如果文件f依赖于dep则增加f的入度并将f加入rev_graph[dep]
for f in files:
for dep in dependencies.get(f, []):
if dep in files: # 只考虑在files中的依赖
in_degree[f] += 1 # f依赖于dep所以f的入度增加
rev_graph[dep].append(f) # dep被f依赖
# 队列初始化为入度为0的文件无依赖的文件
queue = deque([f for f in files if in_degree[f] == 0])
sorted_files = []
while queue:
node = queue.popleft()
sorted_files.append(node)
# 所有依赖于node的文件入度减1
for dependent in rev_graph[node]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# 检查是否所有文件都已排序(无循环依赖)
if len(sorted_files) != len(files):
raise ValueError(f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。")
return sorted_files
def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool:
"""
执行单个命令,检查风险,失败仅记录错误不抛出异常
Returns:
bool: 命令是否成功执行
"""
dangerous, reason = is_dangerous_command(cmd)
if dangerous:
logger.error(f"危险命令被阻止: {cmd},原因: {reason}")
self.console.print(f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]")
return False
logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd or self.output_dir,
capture_output=True,
text=True,
timeout=300, # 5分钟超时
)
logger.debug(f"命令返回码: {result.returncode}")
if result.stdout:
logger.debug(f"stdout: {result.stdout[:500]}")
if result.stderr:
logger.warning(f"stderr: {result.stderr[:500]}")
if result.returncode != 0:
logger.error(f"命令执行失败,返回码: {result.returncode}")
self.console.print(f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]")
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"命令执行超时: {cmd}")
self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]")
return False
except Exception as e:
logger.error(f"命令执行失败: {e}")
self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]")
return False
def run(self, readme_path: Path):
"""
主执行流程,支持基于依赖关系的并发生成
"""
logger.info("=" * 50)
logger.info("开始代码生成流程")
logger.info(f"README: {readme_path}")
logger.info(f"输出目录: {self.output_dir}")
# 解析README
self.console.print("[bold yellow]🔍 正在解析README...[/bold yellow]")
try:
self.readme_content = self.parse_readme(readme_path)
except Exception as e:
logger.error(f"解析README失败无法继续: {e}")
self.console.print(f"[bold red]❌ 解析README失败无法继续: {e}[/bold red]")
return # 致命错误,退出
# 加载状态
state = self.load_state()
if state:
self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]")
self.state = state
# 从状态恢复设计假设design.json已存在
design_path = self.output_dir / "design.json"
if design_path.exists():
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
self.console.print("[bold yellow]⚠ design.json损坏重新生成...[/bold yellow]")
try:
self.design = self.generate_design_json()
except Exception as e2:
logger.error(f"重新生成design.json失败: {e2}")
self.console.print(f"[bold red]❌ 重新生成design.json失败: {e2}[/bold red]")
return
else:
self.console.print("[bold yellow]⚠ design.json不存在重新生成...[/bold yellow]")
try:
self.design = self.generate_design_json()
except Exception as e:
logger.error(f"生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]")
return
else:
self.console.print("[bold yellow]📋 正在生成设计文件...[/bold yellow]")
try:
self.design = self.generate_design_json()
self.state = None
except Exception as e:
logger.error(f"生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]")
return
# 获取项目结构
self.console.print("[bold yellow]📋 正在分析项目结构...[/bold yellow]")
try:
files, dependencies = self.get_project_structure()
except Exception as e:
logger.error(f"获取项目结构失败: {e}")
self.console.print(f"[bold red]❌ 获取项目结构失败: {e}[/bold red]")
return
self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]")
# 添加隐式依赖
# dependencies = self._add_implicit_dependencies(files, dependencies)
# logger.info("已添加隐式依赖")
# 拓扑排序检查依赖关系
try:
sorted_files = self._topological_sort(files, dependencies)
logger.info(f"拓扑排序成功,文件顺序: {sorted_files}")
except ValueError as e:
logger.error(f"依赖关系错误: {e}")
self.console.print(f"[bold red]❌ 依赖关系错误: {e}[/bold red]")
return # 退出生成
# 断点续写:确定已生成文件
generated_files_set = set(self.state.generated_files if self.state else [])
# 构建DAG并计算入度
in_degree = {file: len(dependencies.get(file, [])) for file in files}
# 初始化队列为入度为0且未生成的节点
queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set])
processed_files = set(generated_files_set) # 跟踪已处理文件
remaining_files = set(files) - processed_files
# 创建进度条
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=self.console,
) as progress:
self.progress = progress
total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files))
progress.update(total_task, completed=len(processed_files) - len(generated_files_set))
# 初始化文件任务映射
file_tasks = {} # 局部字典映射文件到任务ID
# 并发任务调度
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = {}
while queue or futures:
# 提交队列中的任务
while queue:
file = queue.popleft()
future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files)
futures[future] = file
# 为每个文件添加独立进度任务并保存任务ID添加状态显示
task_id = progress.add_task(f"{file} - {FileStatus.GENERATING}", total=1) # 修改:添加状态
file_tasks[file] = task_id
# 等待任意任务完成
done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0)
for future in done:
file = futures.pop(future)
try:
success, error_msg = future.result()
# 更新文件进度任务,根据状态更新描述
if file in file_tasks:
if success:
progress.update(file_tasks[file], completed=1, description=f"{file} - {FileStatus.SUCCESS}") # 修改:添加状态
progress.remove_task(file_tasks[file]) # 移除任务
else:
# 如果失败,标记为错误状态
progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态
progress.remove_task(file_tasks[file])
del file_tasks[file] # 清理映射
if success:
processed_files.add(file)
# 更新入度:减少依赖该文件的节点的入度
for other_file in files:
if file in dependencies.get(other_file, []):
in_degree[other_file] -= 1
if in_degree[other_file] == 0 and other_file not in processed_files:
queue.append(other_file)
# 保存状态
self.save_state(list(processed_files), dependencies)
progress.update(total_task, advance=1) # 更新整体进度
else:
logger.error(f"文件 {file} 生成失败,错误: {error_msg}")
self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]")
# 错误处理:继续处理其他文件,但记录失败
except Exception as e:
# 捕获 Future 中存储的异常
logger.error(f"任务 {file} 执行时发生异常: {e}")
self.console.print(f"[bold red]❌ 任务 {file} 执行时发生异常: {e}[/bold red]")
# 将其视为失败
success = False
error_msg = str(e)
# 然后执行和上面 `else` 分支相同的失败处理逻辑
if file in file_tasks:
progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态
progress.remove_task(file_tasks[file])
del file_tasks[file] # 清理映射
logger.error(f"文件 {file} 生成失败,错误: {error_msg}")
self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]")
# 错误处理:继续处理其他文件,但记录失败
logger.success("所有文件处理完成!")
# 清理状态文件
if self.state_file.exists():
try:
self.state_file.unlink()
logger.info("状态文件已清理")
except Exception as e:
logger.error(f"清理状态文件失败: {e}")
self.console.print(f"[bold red]❌ 清理状态文件失败: {e}[/bold red]")
def process_issue(self, issue_content: str, issue_type: str) -> bool:
"""
处理需求增强或 Bug 修复工单
Args:
issue_content: 工单文件内容(文本)
issue_type: 'enhance' 或 'fix'
Returns:
bool: 处理是否成功
"""
logger.info(f"开始处理 {issue_type} 工单")
self.console.print(f"[bold yellow]📋 正在分析 {issue_type} 工单...[/bold yellow]")
# 加载现有 design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。")
self.console.print(f"[bold red]❌ design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。[/bold red]")
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 加载 README 内容(如果存在)
readme_path = self.output_dir / "README.md"
if readme_path.exists():
try:
with open(readme_path, "r", encoding="utf-8") as f:
self.readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
self.readme_content = ""
else:
self.readme_content = ""
# 步骤1: 分析工单,生成变更计划
try:
change_plan = self._analyze_issue(issue_content, issue_type)
except Exception as e:
logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
return False
if not change_plan:
logger.error("无法生成变更计划")
self.console.print("[bold red]❌ 无法生成变更计划[/bold red]")
return False
affected_files = change_plan.get("affected_files", [])
if not affected_files:
logger.warning("工单分析结果未指定任何受影响文件")
self.console.print("[yellow]⚠ 工单分析结果未指定任何受影响文件[/yellow]")
return True # 无变更
self.console.print(f"[green]✅ 分析完成,将处理 {len(affected_files)} 个文件[/green]")
# 添加依赖关系排序:解析 design.json 中的依赖,确保依赖项先于被依赖项处理
# 构建依赖关系字典用于拓扑排序
dependencies_dict = {}
for file_info in affected_files:
path = file_info["path"]
# 从 design.json 中获取依赖关系
deps = []
for f in self.design.files:
if f.path == path:
deps = f.dependencies
break
# 只考虑在 affected_files 中的依赖文件,以确保内部依赖顺序
affected_paths_set = set(info["path"] for info in affected_files)
filtered_deps = [dep for dep in deps if dep in affected_paths_set]
dependencies_dict[path] = filtered_deps
# 对 affected_files 进行拓扑排序
try:
sorted_paths = self._topological_sort([info["path"] for info in affected_files], dependencies_dict)
except ValueError as e:
logger.error(f"依赖关系排序失败: {e}")
self.console.print(f"[bold red]❌ 依赖关系排序失败: {e}[/bold red]")
return False # 排序失败,处理中止
# 重新排序 affected_files 基于 sorted_paths
file_info_map = {info["path"]: info for info in affected_files}
sorted_affected_files = [file_info_map[path] for path in sorted_paths]
# 步骤2: 逐个处理文件(按依赖顺序)
generated_files = []
for file_info in sorted_affected_files:
file_path = file_info["path"]
action = file_info.get("action", "modify") # modify 或 create
description = file_info.get("description", "")
dependencies = file_info.get("dependencies", [])
logger.info(f"处理文件: {file_path} (操作: {action})")
# 读取现有内容(如果是修改)
existing = None
full_path = self.output_dir / file_path
if action == "modify" and full_path.exists():
try:
with open(full_path, "r", encoding="utf-8") as f:
existing = f.read()
except Exception as e:
logger.error(f"读取文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]")
existing = None # 如果读取失败,按新文件处理
elif action == "create" and full_path.exists():
logger.warning(f"文件 {file_path} 已存在,将覆盖")
self.console.print(f"[yellow]⚠ 文件 {file_path} 已存在,将覆盖[/yellow]")
existing = None # 创建模式,即使存在也按新文件处理
# 收集实际存在的依赖文件
dep_paths = []
missing_deps = []
for dep in dependencies:
dep_full = self.output_dir / dep
if dep_full.exists():
dep_paths.append(dep)
else:
missing_deps.append(dep)
if missing_deps:
logger.warning(f"依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}")
self.console.print(f"[yellow]⚠ 依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}[/yellow]")
# 构建生成指令
instruction = f"请根据工单描述{'修改' if action == 'modify' else '生成'}文件 '{file_path}'.\n"
instruction += f"工单内容摘要:{description}\n"
if action == "modify":
instruction += "请在现有代码基础上进行修改,保持原有风格和功能不变。"
else:
instruction += "请生成完整的代码文件。"
# 调用 generate_file
code, desc, commands = self.generate_file(
file_path,
instruction,
dep_paths,
existing_content=existing,
output_format="full",
)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
full_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(full_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {full_path}")
generated_files.append(file_path)
except Exception as e:
logger.error(f"写入文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]")
# 跳过命令执行
commands = []
# 执行关联命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
# 步骤3: 更新 design.json
if generated_files:
"""
try:
self._update_design(generated_files, change_plan.get("design_updates", {}))
self.console.print("[green]✅ design.json 已更新[/green]")
except Exception as e:
logger.error(f"更新design.json失败: {e}")
self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]")
"""
logger.info(f'change_plan: {change_plan}')
self._update_design(generated_files, change_plan.get("design_updates", {}))
self.console.print("[green]✅ design.json 已更新[/green]")
self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]")
return True
def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]:
"""
调用 LLM 分析工单,返回结构化变更计划
"""
system_prompt = (
"你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件design.json"
"分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n"
"- affected_files: 数组,每个元素为一个对象,包含:\n"
" - path: 文件路径(相对于项目根目录)\n"
" - action: 'create' 或 'modify'\n"
" - description: 对此文件变更的简短描述\n"
" - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n"
"- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n"
"注意:仅返回 JSON不要包含其他文本。"
)
# 将现有 design.json 内容作为上下文的一部分
design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False)
user_prompt = (
f"工单类型: {issue_type}\n"
f"工单内容:\n{issue_content}\n\n"
f"现有设计文件 (design.json):\n{design_str}"
)
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
return result
def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]):
"""
根据生成的变更更新 design.json
使用 FileModel 来处理文件信息
"""
updated = False
# 处理新增文件
for file_path in generated_files:
# 检查文件是否已在 design.files 中
exists = any(f.path == file_path for f in self.design.files)
if not exists:
# 获取更新信息
update_info = design_updates.get(file_path, {})
# 创建新文件条目FileModel实例
new_file = FileModel(
path=file_path,
summary=update_info.get("summary", "自动生成的新文件"),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {})
)
self.design.files.append(new_file)
updated = True
logger.info(f"已将新文件 {file_path} 添加到 design.json")
# 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要)
# 这里可根据实际需求扩展,当前仅处理新增文件
if updated:
# 保存更新后的 design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info("design.json 已更新")
def refresh_design(self) -> bool:
"""
重新生成design.json基于当前README内容或加载的design.json
返回bool表示是否成功
"""
logger.info("开始刷新design.json")
if not self.readme_content:
# 尝试读取README.md文件
readme_path = self.output_dir / "README.md"
if readme_path.exists():
try:
self.readme_content = self.parse_readme(readme_path)
except Exception as e:
logger.error(f"读取README.md失败无法刷新design: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败无法刷新design: {e}[/bold red]")
return False
else:
logger.error("没有README内容且README.md文件不存在无法刷新design")
self.console.print("[bold red]❌ 没有README内容且README.md文件不存在无法刷新design[/bold red]")
return False
try:
self.design = self.generate_design_json()
logger.info("design.json已成功重新生成")
self.console.print("[green]✅ design.json已重新生成[/green]")
return True
except Exception as e:
logger.error(f"重新生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]")
return False
def update_file_entry(self, file_path: str, file_content: str) -> bool:
"""
更新design.json中单个文件的条目基于提供的文件内容
返回bool表示是否成功
"""
logger.info(f"开始更新design.json中文件条目: {file_path}")
if not self.design:
# 加载现有design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]")
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 调用LLM分析文件内容返回更新信息
system_prompt = (
"你是一个软件架构师。分析给定的文件内容并返回对design.json中该文件条目的更新。"
"返回严格的JSON对象包含以下字段\n"
"- summary: 文件的新摘要\n"
"- dependencies: 依赖文件列表\n"
"- functions: 函数列表每个对象有name, summary, inputs, outputs\n"
"- classes: 类列表每个对象有name, summary, methods\n"
"注意仅返回JSON不要其他文本。"
)
# 准备当前design.json中该文件的条目信息
current_entry = None
for f in self.design.files:
if f.path == file_path:
current_entry = f.model_dump()
break
user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目如果存在:\n{json.dumps(current_entry, indent=2) if current_entry else '无'}"
try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
update_info = result
# 查找或创建文件条目
file_model = None
for f in self.design.files:
if f.path == file_path:
file_model = f
break
if file_model is None:
# 创建新条目
file_model = FileModel(
path=file_path,
summary=update_info.get("summary", ""),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", [])
)
self.design.files.append(file_model)
logger.info(f"在design.json中创建了新文件条目: {file_path}")
else:
# 更新现有条目
file_model.summary = update_info.get("summary", file_model.summary)
file_model.dependencies = update_info.get("dependencies", file_model.dependencies)
file_model.functions = update_info.get("functions", file_model.functions)
file_model.classes = update_info.get("classes", file_model.classes)
logger.info(f"更新了design.json中的文件条目: {file_path}")
# 保存更新后的design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_path}")
self.console.print(f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]")
return True
except Exception as e:
logger.error(f"更新文件条目失败: {e}")
self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]")
return False
def sync_readme(self) -> bool:
"""
同步README.md和design.json确保内容一致性
返回bool表示是否成功
"""
logger.info("开始同步README.md和design.json")
# 读取README.md
readme_path = self.output_dir / "README.md"
if not readme_path.exists():
logger.error(f"README.md不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]")
return False
try:
with open(readme_path, "r", encoding="utf-8") as f:
readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
return False
# 加载design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]")
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 调用LLM比较和同步
system_prompt = (
"你是一个软件架构师。比较README.md内容和design.json识别不一致之处并建议更新。"
"返回严格的JSON对象包含以下字段\n"
"- needs_update: bool, 是否需要更新\n"
"- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n"
"- updates: 对象,描述具体的更新内容\n"
"注意仅返回JSON不要其他文本。"
)
user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}"
try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
needs_update = result.get("needs_update", False)
if not needs_update:
logger.info("README.md和design.json已同步无需更新")
self.console.print("[green]✅ README.md和design.json已同步无需更新[/green]")
return True
update_type = result.get("update_type", "")
updates = result.get("updates", {})
if update_type == "readme":
# 更新README.md
new_readme = updates.get("new_readme", readme_content)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
logger.info("已更新README.md")
self.console.print("[green]✅ README.md已更新[/green]")
elif update_type == "design":
# 更新design.json
new_design_data = updates.get("new_design", design.model_dump())
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已更新design.json")
self.console.print("[green]✅ design.json已更新[/green]")
elif update_type == "both":
# 更新两者
new_readme = updates.get("new_readme", readme_content)
new_design_data = updates.get("new_design", design.model_dump())
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已同步更新README.md和design.json")
self.console.print("[green]✅ README.md和design.json已同步更新[/green]")
else:
logger.warning(f"未知的update_type: {update_type}")
self.console.print(f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]")
return False
return True
except Exception as e:
logger.error(f"同步README.md失败: {e}")
self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]")
return False
```