diff --git a/design.json b/design.json index dae2e85..af53428 100644 --- a/design.json +++ b/design.json @@ -230,6 +230,63 @@ "functions": [], "classes": [], "design_updates": {} + }, + { + "path": "src/llm_codegen/init_generator.py", + "summary": "初始化命令生成器,处理 init 命令逻辑", + "dependencies": [ + "src/llm_codegen/core.py", + "src/llm_codegen/models.py" + ], + "functions": [], + "classes": [ + { + "name": "InitGenerator", + "summary": "继承自 BaseGenerator,包含 run 方法", + "methods": [ + "run" + ] + } + ], + "design_updates": {} + }, + { + "path": "src/llm_codegen/enhance_generator.py", + "summary": "增强命令生成器,处理 enhance 命令逻辑", + "dependencies": [ + "src/llm_codegen/core.py", + "src/llm_codegen/models.py" + ], + "functions": [], + "classes": [ + { + "name": "EnhanceGenerator", + "summary": "继承自 BaseGenerator,包含 process_enhance 方法", + "methods": [ + "process_enhance" + ] + } + ], + "design_updates": {} + }, + { + "path": "src/llm_codegen/fix_generator.py", + "summary": "修复命令生成器,处理 fix 命令逻辑", + "dependencies": [ + "src/llm_codegen/core.py", + "src/llm_codegen/models.py" + ], + "functions": [], + "classes": [ + { + "name": "FixGenerator", + "summary": "继承自 BaseGenerator,包含 process_fix 方法", + "methods": [ + "process_fix" + ] + } + ], + "design_updates": {} } ], "commands": [ diff --git a/issues/refactor-split-core-generator.issue b/issues/refactor-split-core-generator.issue new file mode 100644 index 0000000..5729634 --- /dev/null +++ b/issues/refactor-split-core-generator.issue @@ -0,0 +1,1249 @@ +# 需求工单: 拆分 core.py 为多个子类文件 + +name: 重构 core.py,拆分为基类和子命令专用类 +description: | + 当前 `core.py` 中的 `CodeGenerator` 类承担了所有子命令(init、enhance、fix)的实现逻辑,导致文件过大、职责不清晰。 + 为了提高代码可维护性和可扩展性,需要将 `CodeGenerator` 重构为一个基类(例如 `BaseGenerator`),包含公共方法(如 `_call_llm`、`_topological_sort`、`execute_command` 等), + 然后为每个子命令创建独立的子类,分别放在单独的文件中: + - `InitGenerator`(文件:`init_generator.py`):处理 `init` 命令的逻辑(原 `run` 方法)。 + - `EnhanceGenerator`(文件:`enhance_generator.py`):处理 `enhance` 命令的逻辑(原 `process_issue` 方法中与 enhance 相关的部分)。 + - `FixGenerator`(文件:`fix_generator.py`):处理 `fix` 命令的逻辑(原 `process_issue` 方法中与 fix 相关的部分)。 + + 同时,保留 `core.py` 作为基类文件(或更名为 `base_generator.py`),原有 `CodeGenerator` 类改为 `BaseGenerator`,并将公共方法保留在其中。 + 命令行接口(`cli.py`)中对应命令的实例化部分需要改为使用对应的子类。 + + 注意:`process_issue` 方法目前同时被 `enhance` 和 `fix` 使用,可以根据 `issue_type` 参数决定行为,拆分后应在两个子类中分别实现各自逻辑,避免重复。 + +affected_files: + - src/llm_codegen/core.py + - src/llm_codegen/cli.py + - src/llm_codegen/init_generator.py # 新建 + - src/llm_codegen/enhance_generator.py # 新建 + - src/llm_codegen/fix_generator.py # 新建 + +acceptance_criteria: + - 基类 `BaseGenerator` 包含以下公共方法:`__init__`、`_call_llm`、`_topological_sort`、`execute_command`、`_apply_diff`、`generate_file`、`parse_readme`、`generate_design_json`、`load_state`、`save_state`、`get_project_structure`、`_add_implicit_dependencies`、`_generate_file_task`(这些方法在现有 CodeGenerator 中属于通用功能)。 + - `InitGenerator` 继承自 `BaseGenerator`,包含 `run` 方法(原 `CodeGenerator.run` 逻辑),并可能根据需要进行调整。 + - `EnhanceGenerator` 继承自 `BaseGenerator`,包含 `process_enhance` 方法(或直接实现 `process_issue` 但仅处理 enhance 类型),以及可能需要的辅助方法。 + - `FixGenerator` 继承自 `BaseGenerator`,包含 `process_fix` 方法(处理 fix 类型)。 + - 所有新建文件符合项目代码风格,包含适当的类型注解和文档字符串。 + - `cli.py` 中对应命令(init、enhance、fix)分别实例化对应的子类,并调用相应方法,保持原有命令行行为不变。 + - 原有的 `process_issue` 方法(同时处理 enhance/fix)应被移除,确保职责分离。 + - 项目能够正常通过所有现有测试,且功能与原版一致。 + - 代码检查工具(pylint、mypy、black)无新增错误或警告。 + +> 初始core.py实现: + +```python +import json +import os +import subprocess +import sys +import concurrent.futures +import pendulum +from typing import List, Dict, Optional, Any, Tuple +from pathlib import Path +from collections import deque +import threading + +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID +from loguru import logger +from openai import OpenAI + +from .utils import is_dangerous_command +from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 FileStatus 导入 +from .diff_applier import parse_diff, apply_diff + + +class CodeGenerator: + """代码生成器,封装所有逻辑,支持设计层、断点续写和命令执行""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = "https://api.deepseek.com", + model: str = "deepseek-reasoner", + output_dir: str = "./generated", + log_file: Optional[str] = None, + max_concurrency: int = 4 + ): + """ + 初始化生成器 + + Args: + api_key: OpenAI API密钥,默认从环境变量DEEPSEEK_APIKEY读取 + base_url: API基础URL + model: 使用的模型 + output_dir: 输出根目录 + log_file: 日志文件路径,默认自动生成 + """ + self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY") + if not self.api_key: + raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") + + self.client = OpenAI(api_key=self.api_key, base_url=base_url) + self.model = model + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.state_file = self.output_dir / ".llm_generator_state.json" + self.console = Console() # 添加console实例用于rich打印 + self._state_lock = threading.Lock() + + self.max_concurrency = max_concurrency + + # 配置日志 + if log_file is None: + log_file = self.output_dir / "generator.log" + logger.remove() # 移除默认handler + logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上 + logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG + logger.info(f"日志已初始化,保存至: {log_file}") + + self.readme_content = None + self.design: Optional[DesignModel] = None + self.state: Optional[StateModel] = None + self.progress: Optional[Progress] = None + self.tasks: Dict[str, TaskID] = {} # 任务ID映射 + + def _call_llm( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.2, + expect_json: bool = True, + ) -> Dict[str, Any]: + """ + 调用LLM并返回解析后的JSON + """ + logger.debug(f"调用LLM,模型: {self.model}") + logger.debug(f"System: {system_prompt[:200]}...") + logger.debug(f"User: {user_prompt[:200]}...") + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + temperature=temperature, + response_format={"type": "json_object"} if expect_json else None, + ) + + message = response.choices[0].message + content = message.content + + # 记录思考过程(如果存在) + reasoning_content = None + if hasattr(message, "reasoning_content") and message.reasoning_content: + reasoning_content = message.reasoning_content + logger.info("模型思考过程已记录") + + # 创建响应目录 + responses_dir = self.output_dir / "llm_responses" + responses_dir.mkdir(parents=True, exist_ok=True) + + # 生成文件名(使用当前时间) + timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") + response_file = responses_dir / f"response_{timestamp}.json" + + # 保存响应到JSON文件 + response_data = { + "timestamp": timestamp, + "model": self.model, + "content": content, + "reasoning_content": reasoning_content, + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "temperature": temperature, + "expect_json": expect_json + } + + with open(response_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, indent=2, ensure_ascii=False) + + logger.debug(f"LLM原始响应: {response_file.name}") + + if expect_json: + result = json.loads(content) + else: + result = {"content": content} + + return result + + except json.JSONDecodeError as e: + logger.error(f"JSON解析失败: {e}") + self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") + raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") + except Exception as e: + logger.error(f"LLM调用失败: {e}") + self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") + raise + + + def parse_readme(self, readme_path: Path) -> str: + """ + 读取README文件内容 + """ + logger.info(f"读取README文件: {readme_path}") + try: + with open(readme_path, "r", encoding="utf-8") as f: + content = f.read() + logger.debug(f"README内容长度: {len(content)} 字符") + return content + except Exception as e: + logger.error(f"读取README失败: {e}") + self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]") + raise + + def generate_design_json(self) -> DesignModel: + """ + 调用LLM生成design.json内容,并解析为DesignModel + """ + system_prompt = ( + "你是一个软件架构师。请根据README描述,生成项目的中间设计文件design.json。" + "design.json应包含项目名称、版本、描述、文件列表(含路径、摘要、依赖、函数和类)、建议命令和检查工具。" + "返回严格的 JSON 对象,符合DesignModel结构。" + ) + user_prompt = f"README内容如下:\n\n{self.readme_content}" + + result = self._call_llm(system_prompt, user_prompt) + design_data = result + design = DesignModel(**design_data) + + # 写入design.json文件 + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info(f"已生成design.json: {design_path}") + + return design + + def load_state(self) -> Optional[StateModel]: + """加载断点续写状态""" + if self.state_file.exists(): + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state_data = json.load(f) + self.state = StateModel(**state_data) + logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个") + return self.state + except Exception as e: + logger.error(f"加载状态失败: {e}") + self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") + return None + return None + + def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: + """保存断点续写状态,适应并发生成(线程安全)""" + with self._state_lock: # 串行化写入 + state = StateModel( + current_file_index=0, + generated_files=generated_files, + dependencies_map=dependencies_map, + total_files=len(self.design.files) if self.design else 0, + output_dir=str(self.output_dir), + readme_path=self.readme_content[:100] if self.readme_content else "" + ) + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) + logger.debug(f"状态已保存: {self.state_file}") + + + def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: + """ + 从design.json获取文件列表和依赖关系 + + Returns: + (files, dependencies) + files: 按顺序需要生成的文件路径列表 + dependencies: 字典 {file: [依赖文件路径]} + """ + if not self.design: + raise ValueError("design.json未加载,请先调用generate_design_json") + + files = [file.path for file in self.design.files] + dependencies = {file.path: file.dependencies for file in self.design.files} + + logger.info(f"从design.json解析到 {len(files)} 个待生成文件") + logger.debug(f"文件列表: {files}") + logger.debug(f"依赖关系: {dependencies}") + + return files, dependencies + + def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]: + """ + 添加隐式依赖关系,基于文件路径和常见模式 + + Args: + files: 文件路径列表 + dependencies: 原始依赖字典 + + Returns: + Dict[str, List[str]]: 增强后的依赖字典 + """ + enhanced = dependencies.copy() + for file in files: + if file not in enhanced: + enhanced[file] = [] + # 添加同一目录下的其他文件作为隐式依赖(简单示例) + path = Path(file) + implicit_deps = [ + f for f in files + if f != file and Path(f).parent == path.parent and f not in enhanced[file] + ] + if implicit_deps: + enhanced[file].extend(implicit_deps) + logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") + return enhanced + + def _apply_diff(self, diff: str, original_content: str) -> str: + """ + 应用 unified diff 到原始内容,返回修改后的内容。 + + Args: + diff: 字符串形式的 unified diff + original_content: 原始文件内容 + + Returns: + str: 应用 diff 后的内容 + + Raises: + Exception: 如果应用 diff 失败 + """ + try: + # 解析 diff 行 + diff_lines = diff.splitlines(keepends=True) + if not diff_lines: + raise ValueError("diff 为空") + + # 简单的 diff 应用逻辑:假设 diff 是标准 unified diff,逐行处理 + # 注意:这是一个简化实现,对于复杂 diff 可能不准确,建议使用专用库如 `patch` + original_lines = original_content.splitlines(keepends=True) + result_lines = [] + i = 0 + j = 0 + while i < len(diff_lines): + line = diff_lines[i] + if line.startswith('--- ') or line.startswith('+++ '): + i += 1 + continue + elif line.startswith('@@ '): + i += 1 + continue + elif line.startswith(' '): + # 未修改行 + if j < len(original_lines): + result_lines.append(original_lines[j]) + j += 1 + i += 1 + elif line.startswith('-'): + # 删除行 + j += 1 + i += 1 + elif line.startswith('+'): + # 新增行 + result_lines.append(line[1:]) + i += 1 + else: + i += 1 # 跳过未知行 + + # 添加剩余原始行 + while j < len(original_lines): + result_lines.append(original_lines[j]) + j += 1 + + return ''.join(result_lines) + except Exception as e: + logger.error(f"应用 diff 时出错: {e}") + raise RuntimeError(f"无法应用 diff: {e}") + + def generate_file( + self, + file_path: str, + prompt_instruction: str, + dependency_files: List[str], + existing_content: Optional[str] = None, + output_format: str = "full", # 新增参数,默认 'full' + ) -> Tuple[str, str, List[str]]: + """ + 生成单个文件,返回 (代码, 描述, 命令列表) + + Args: + file_path: 目标文件路径 + prompt_instruction: 生成指令 + dependency_files: 依赖文件列表(用于上下文) + existing_content: 文件现有内容(若为修改模式) + output_format: 输出格式,'full' 或 'diff',来自 models.py + """ + # 收集上下文内容 + context_content = [] + + if self.readme_content: + context_content.append(f"### 项目 README ###\n{self.readme_content}\n") + + # 添加 design.json 上下文 + design_path = self.output_dir / "design.json" + if design_path.exists(): + try: + with open(design_path, "r", encoding="utf-8") as f: + design_content = f.read() + context_content.append(f"### 设计文件: design.json ###\n{design_content}\n") + except Exception as e: + logger.error(f"读取design.json失败: {e}") + self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") + # 如果design.json读取失败,可能无法继续,但保持上下文为空或部分 + + # 添加依赖文件内容(仅读取存在的文件) + for dep in dependency_files: + dep_path = Path(dep) + if not dep_path.exists(): + alt_path = self.output_dir / dep + if alt_path.exists(): + dep_path = alt_path + else: + logger.warning(f"依赖文件不存在,已跳过: {dep}") + self.console.print(f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]") + continue + + try: + with open(dep_path, "r", encoding="utf-8") as f: + content = f.read() + context_content.append(f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n") + except Exception as e: + logger.error(f"读取依赖文件 {dep} 失败: {e}") + self.console.print(f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]") + # 跳过此依赖文件 + + # 如果有现有内容,也加入上下文 + if existing_content is not None: + context_content.append(f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n") + + full_context = "\n".join(context_content) + + # 根据 output_format 设置 system_prompt + if output_format == "diff": + if existing_content is None: + logger.error("对于 output_format='diff',必须提供 existing_content") + self.console.print("[bold red]❌ 对于 output_format='diff',必须提供 existing_content[/bold red]") + return "# 错误:缺少现有内容", "生成失败,缺少现有内容", [] + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成文件的差异(diff)。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- diff: (string) 文件的差异,使用 unified diff 格式\n" + "- description: (string) 简短的中文修改描述\n" + "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表,若无则返回空数组\n" + "- output_format: (string) 应为 'diff'" + ) + else: + # output_format 为 'full' 或其他,保持现有逻辑 + if existing_content is not None: + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- code: (string) 修改后的完整代码\n" + "- description: (string) 简短的中文修改描述\n" + "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" + ) + else: + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- code: (string) 生成的完整代码\n" + "- description: (string) 简短的中文功能描述\n" + "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" + ) + + user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" + if output_format == "diff": + user_prompt += f"\noutput_format: {output_format}" + + try: + result = self._call_llm(system_prompt, user_prompt) + # 解析响应,假设包含 output_format 字段 + if output_format == "diff": + diff = result.get("diff") + description = result.get("description", "") + commands = result.get("commands", []) + result.get("output_format", "diff") + if diff is None: + raise ValueError("LLM 响应中没有 diff 字段") + # 调用 diff_applier 应用 diff + try: + chunks = parse_diff(diff) + code, conflicts = apply_diff(existing_content, chunks) + if conflicts: + logger.warning(f"应用diff时发现冲突: {conflicts}") + # 可以记录冲突,但继续处理 + except Exception as e: + logger.error(f"应用 diff 时发生意外错误: {e}") + self.console.print(f"[bold red]❌ 应用 diff 时发生意外错误: {e}[/bold red]") + return "# 应用 diff 失败", f"应用 diff 时发生意外错误: {e}", [] + return code, description, commands + else: + code = result.get("code") + description = result.get("description", "") + commands = result.get("commands", []) + result.get("output_format", "full") + if code is None: + raise ValueError("LLM 响应中没有 code 字段") + return code, description, commands + except Exception as e: + logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") + self.console.print(f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]") + # 返回默认值以便继续 + return "# 生成失败,请检查日志", "生成失败,发生错误", [] + + def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]: + """ + 并发任务函数,用于生成单个文件 + + Args: + file_path: 文件路径 + dependencies: 依赖文件列表 + generated_files: 已生成文件的集合(用于上下文) + + Returns: + Tuple[bool, str]: (是否成功, 错误信息或空字符串) + """ + try: + instruction = f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + # 过滤依赖文件,只使用已生成的 + available_deps = [dep for dep in dependencies if dep in generated_files] + code, desc, commands = self.generate_file(file_path, instruction, available_deps) + logger.info(f"生成完成: {file_path} - {desc}") + + # 写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入: {output_path}") + + # 执行命令 + for cmd in commands: + logger.info(f"准备执行命令: {cmd}") + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续处理: {cmd}") + return True, "" + except Exception as e: + logger.error(f"生成文件 {file_path} 失败: {e}") + return False, str(e) + + def _topological_sort(self, files: List[str], dependencies: Dict[str, List[str]]) -> List[str]: + """ + 对文件列表进行拓扑排序,基于依赖关系。 + 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 + 如果检测到循环依赖,抛出ValueError。 + """ + from collections import deque + + # 初始化入度和反向邻接表 + in_degree = {f: 0 for f in files} + rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f + + # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] + for f in files: + for dep in dependencies.get(f, []): + if dep in files: # 只考虑在files中的依赖 + in_degree[f] += 1 # f依赖于dep,所以f的入度增加 + rev_graph[dep].append(f) # dep被f依赖 + + # 队列初始化为入度为0的文件(无依赖的文件) + queue = deque([f for f in files if in_degree[f] == 0]) + sorted_files = [] + + while queue: + node = queue.popleft() + sorted_files.append(node) + # 所有依赖于node的文件入度减1 + for dependent in rev_graph[node]: + in_degree[dependent] -= 1 + if in_degree[dependent] == 0: + queue.append(dependent) + + # 检查是否所有文件都已排序(无循环依赖) + if len(sorted_files) != len(files): + raise ValueError(f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。") + + return sorted_files + + def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: + """ + 执行单个命令,检查风险,失败仅记录错误不抛出异常 + + Returns: + bool: 命令是否成功执行 + """ + dangerous, reason = is_dangerous_command(cmd) + if dangerous: + logger.error(f"危险命令被阻止: {cmd},原因: {reason}") + self.console.print(f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]") + return False + + logger.info(f"执行命令: {cmd}") + try: + result = subprocess.run( + cmd, + shell=True, + cwd=cwd or self.output_dir, + capture_output=True, + text=True, + timeout=300, # 5分钟超时 + ) + logger.debug(f"命令返回码: {result.returncode}") + if result.stdout: + logger.debug(f"stdout: {result.stdout[:500]}") + if result.stderr: + logger.warning(f"stderr: {result.stderr[:500]}") + if result.returncode != 0: + logger.error(f"命令执行失败,返回码: {result.returncode}") + self.console.print(f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]") + return False + return True + except subprocess.TimeoutExpired: + logger.error(f"命令执行超时: {cmd}") + self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") + return False + except Exception as e: + logger.error(f"命令执行失败: {e}") + self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") + return False + + def run(self, readme_path: Path): + """ + 主执行流程,支持基于依赖关系的并发生成 + """ + logger.info("=" * 50) + logger.info("开始代码生成流程") + logger.info(f"README: {readme_path}") + logger.info(f"输出目录: {self.output_dir}") + + # 解析README + self.console.print("[bold yellow]🔍 正在解析README...[/bold yellow]") + try: + self.readme_content = self.parse_readme(readme_path) + except Exception as e: + logger.error(f"解析README失败,无法继续: {e}") + self.console.print(f"[bold red]❌ 解析README失败,无法继续: {e}[/bold red]") + return # 致命错误,退出 + + # 加载状态 + state = self.load_state() + if state: + self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]") + self.state = state + # 从状态恢复设计,假设design.json已存在 + design_path = self.output_dir / "design.json" + if design_path.exists(): + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + self.console.print("[bold yellow]⚠ design.json损坏,重新生成...[/bold yellow]") + try: + self.design = self.generate_design_json() + except Exception as e2: + logger.error(f"重新生成design.json失败: {e2}") + self.console.print(f"[bold red]❌ 重新生成design.json失败: {e2}[/bold red]") + return + else: + self.console.print("[bold yellow]⚠ design.json不存在,重新生成...[/bold yellow]") + try: + self.design = self.generate_design_json() + except Exception as e: + logger.error(f"生成design.json失败: {e}") + self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]") + return + else: + self.console.print("[bold yellow]📋 正在生成设计文件...[/bold yellow]") + try: + self.design = self.generate_design_json() + self.state = None + except Exception as e: + logger.error(f"生成design.json失败: {e}") + self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]") + return + + # 获取项目结构 + self.console.print("[bold yellow]📋 正在分析项目结构...[/bold yellow]") + try: + files, dependencies = self.get_project_structure() + except Exception as e: + logger.error(f"获取项目结构失败: {e}") + self.console.print(f"[bold red]❌ 获取项目结构失败: {e}[/bold red]") + return + self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]") + + # 添加隐式依赖 + # dependencies = self._add_implicit_dependencies(files, dependencies) + # logger.info("已添加隐式依赖") + + # 拓扑排序检查依赖关系 + try: + sorted_files = self._topological_sort(files, dependencies) + logger.info(f"拓扑排序成功,文件顺序: {sorted_files}") + except ValueError as e: + logger.error(f"依赖关系错误: {e}") + self.console.print(f"[bold red]❌ 依赖关系错误: {e}[/bold red]") + return # 退出生成 + + # 断点续写:确定已生成文件 + generated_files_set = set(self.state.generated_files if self.state else []) + + # 构建DAG并计算入度 + in_degree = {file: len(dependencies.get(file, [])) for file in files} + # 初始化队列为入度为0且未生成的节点 + queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set]) + processed_files = set(generated_files_set) # 跟踪已处理文件 + remaining_files = set(files) - processed_files + + # 创建进度条 + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + console=self.console, + ) as progress: + self.progress = progress + total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files)) + progress.update(total_task, completed=len(processed_files) - len(generated_files_set)) + + # 初始化文件任务映射 + file_tasks = {} # 局部字典,映射文件到任务ID + + # 并发任务调度 + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrency) as executor: + futures = {} + while queue or futures: + # 提交队列中的任务 + while queue: + file = queue.popleft() + future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files) + futures[future] = file + # 为每个文件添加独立进度任务并保存任务ID,添加状态显示 + task_id = progress.add_task(f"{file} - {FileStatus.GENERATING}", total=1) # 修改:添加状态 + file_tasks[file] = task_id + + # 等待任意任务完成 + done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0) + for future in done: + file = futures.pop(future) + try: + success, error_msg = future.result() + # 更新文件进度任务,根据状态更新描述 + if file in file_tasks: + if success: + progress.update(file_tasks[file], completed=1, description=f"{file} - {FileStatus.SUCCESS}") # 修改:添加状态 + progress.remove_task(file_tasks[file]) # 移除任务 + else: + # 如果失败,标记为错误状态 + progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态 + progress.remove_task(file_tasks[file]) + del file_tasks[file] # 清理映射 + if success: + processed_files.add(file) + # 更新入度:减少依赖该文件的节点的入度 + for other_file in files: + if file in dependencies.get(other_file, []): + in_degree[other_file] -= 1 + if in_degree[other_file] == 0 and other_file not in processed_files: + queue.append(other_file) + # 保存状态 + self.save_state(list(processed_files), dependencies) + progress.update(total_task, advance=1) # 更新整体进度 + else: + logger.error(f"文件 {file} 生成失败,错误: {error_msg}") + self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]") + # 错误处理:继续处理其他文件,但记录失败 + except Exception as e: + # 捕获 Future 中存储的异常 + logger.error(f"任务 {file} 执行时发生异常: {e}") + self.console.print(f"[bold red]❌ 任务 {file} 执行时发生异常: {e}[/bold red]") + # 将其视为失败 + success = False + error_msg = str(e) + # 然后执行和上面 `else` 分支相同的失败处理逻辑 + if file in file_tasks: + progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态 + progress.remove_task(file_tasks[file]) + del file_tasks[file] # 清理映射 + logger.error(f"文件 {file} 生成失败,错误: {error_msg}") + self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]") + # 错误处理:继续处理其他文件,但记录失败 + + + logger.success("所有文件处理完成!") + # 清理状态文件 + if self.state_file.exists(): + try: + self.state_file.unlink() + logger.info("状态文件已清理") + except Exception as e: + logger.error(f"清理状态文件失败: {e}") + self.console.print(f"[bold red]❌ 清理状态文件失败: {e}[/bold red]") + + def process_issue(self, issue_content: str, issue_type: str) -> bool: + """ + 处理需求增强或 Bug 修复工单 + + Args: + issue_content: 工单文件内容(文本) + issue_type: 'enhance' 或 'fix' + + Returns: + bool: 处理是否成功 + """ + logger.info(f"开始处理 {issue_type} 工单") + self.console.print(f"[bold yellow]📋 正在分析 {issue_type} 工单...[/bold yellow]") + + # 加载现有 design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。") + self.console.print(f"[bold red]❌ design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。[/bold red]") + return False + + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + return False + + # 加载 README 内容(如果存在) + readme_path = self.output_dir / "README.md" + if readme_path.exists(): + try: + with open(readme_path, "r", encoding="utf-8") as f: + self.readme_content = f.read() + except Exception as e: + logger.error(f"读取README.md失败: {e}") + self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") + self.readme_content = "" + else: + self.readme_content = "" + + # 步骤1: 分析工单,生成变更计划 + try: + change_plan = self._analyze_issue(issue_content, issue_type) + except Exception as e: + logger.error(f"分析工单失败: {e}") + self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") + return False + if not change_plan: + logger.error("无法生成变更计划") + self.console.print("[bold red]❌ 无法生成变更计划[/bold red]") + return False + + affected_files = change_plan.get("affected_files", []) + if not affected_files: + logger.warning("工单分析结果未指定任何受影响文件") + self.console.print("[yellow]⚠ 工单分析结果未指定任何受影响文件[/yellow]") + return True # 无变更 + + self.console.print(f"[green]✅ 分析完成,将处理 {len(affected_files)} 个文件[/green]") + + # 添加依赖关系排序:解析 design.json 中的依赖,确保依赖项先于被依赖项处理 + # 构建依赖关系字典用于拓扑排序 + dependencies_dict = {} + for file_info in affected_files: + path = file_info["path"] + # 从 design.json 中获取依赖关系 + deps = [] + for f in self.design.files: + if f.path == path: + deps = f.dependencies + break + # 只考虑在 affected_files 中的依赖文件,以确保内部依赖顺序 + affected_paths_set = set(info["path"] for info in affected_files) + filtered_deps = [dep for dep in deps if dep in affected_paths_set] + dependencies_dict[path] = filtered_deps + + # 对 affected_files 进行拓扑排序 + try: + sorted_paths = self._topological_sort([info["path"] for info in affected_files], dependencies_dict) + except ValueError as e: + logger.error(f"依赖关系排序失败: {e}") + self.console.print(f"[bold red]❌ 依赖关系排序失败: {e}[/bold red]") + return False # 排序失败,处理中止 + + # 重新排序 affected_files 基于 sorted_paths + file_info_map = {info["path"]: info for info in affected_files} + sorted_affected_files = [file_info_map[path] for path in sorted_paths] + + # 步骤2: 逐个处理文件(按依赖顺序) + generated_files = [] + for file_info in sorted_affected_files: + file_path = file_info["path"] + action = file_info.get("action", "modify") # modify 或 create + description = file_info.get("description", "") + dependencies = file_info.get("dependencies", []) + + logger.info(f"处理文件: {file_path} (操作: {action})") + + # 读取现有内容(如果是修改) + existing = None + full_path = self.output_dir / file_path + if action == "modify" and full_path.exists(): + try: + with open(full_path, "r", encoding="utf-8") as f: + existing = f.read() + except Exception as e: + logger.error(f"读取文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]") + existing = None # 如果读取失败,按新文件处理 + elif action == "create" and full_path.exists(): + logger.warning(f"文件 {file_path} 已存在,将覆盖") + self.console.print(f"[yellow]⚠ 文件 {file_path} 已存在,将覆盖[/yellow]") + existing = None # 创建模式,即使存在也按新文件处理 + + # 收集实际存在的依赖文件 + dep_paths = [] + missing_deps = [] + for dep in dependencies: + dep_full = self.output_dir / dep + if dep_full.exists(): + dep_paths.append(dep) + else: + missing_deps.append(dep) + if missing_deps: + logger.warning(f"依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}") + self.console.print(f"[yellow]⚠ 依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}[/yellow]") + + # 构建生成指令 + instruction = f"请根据工单描述{'修改' if action == 'modify' else '生成'}文件 '{file_path}'.\n" + instruction += f"工单内容摘要:{description}\n" + if action == "modify": + instruction += "请在现有代码基础上进行修改,保持原有风格和功能不变。" + else: + instruction += "请生成完整的代码文件。" + + # 调用 generate_file + code, desc, commands = self.generate_file( + file_path, + instruction, + dep_paths, + existing_content=existing, + output_format="full", + ) + logger.info(f"生成完成: {file_path} - {desc}") + + # 写入文件 + full_path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(full_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入: {full_path}") + generated_files.append(file_path) + except Exception as e: + logger.error(f"写入文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]") + # 跳过命令执行 + commands = [] + + # 执行关联命令 + for cmd in commands: + logger.info(f"准备执行命令: {cmd}") + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续处理: {cmd}") + + # 步骤3: 更新 design.json + if generated_files: + """ + try: + self._update_design(generated_files, change_plan.get("design_updates", {})) + self.console.print("[green]✅ design.json 已更新[/green]") + except Exception as e: + logger.error(f"更新design.json失败: {e}") + self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]") + """ + logger.info(f'change_plan: {change_plan}') + self._update_design(generated_files, change_plan.get("design_updates", {})) + self.console.print("[green]✅ design.json 已更新[/green]") + + + self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]") + return True + + + def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: + """ + 调用 LLM 分析工单,返回结构化变更计划 + """ + system_prompt = ( + "你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件(design.json)," + "分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n" + "- affected_files: 数组,每个元素为一个对象,包含:\n" + " - path: 文件路径(相对于项目根目录)\n" + " - action: 'create' 或 'modify'\n" + " - description: 对此文件变更的简短描述\n" + " - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n" + "- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n" + "注意:仅返回 JSON,不要包含其他文本。" + ) + + # 将现有 design.json 内容作为上下文的一部分 + design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) + user_prompt = ( + f"工单类型: {issue_type}\n" + f"工单内容:\n{issue_content}\n\n" + f"现有设计文件 (design.json):\n{design_str}" + ) + + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + return result + + def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): + """ + 根据生成的变更更新 design.json + 使用 FileModel 来处理文件信息 + """ + updated = False + + # 处理新增文件 + for file_path in generated_files: + # 检查文件是否已在 design.files 中 + exists = any(f.path == file_path for f in self.design.files) + if not exists: + # 获取更新信息 + update_info = design_updates.get(file_path, {}) + + # 创建新文件条目(FileModel实例) + new_file = FileModel( + path=file_path, + summary=update_info.get("summary", "自动生成的新文件"), + dependencies=update_info.get("dependencies", []), + functions=update_info.get("functions", []), + classes=update_info.get("classes", []), + design_updates=update_info.get("design_updates", {}) + ) + self.design.files.append(new_file) + updated = True + logger.info(f"已将新文件 {file_path} 添加到 design.json") + + # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) + # 这里可根据实际需求扩展,当前仅处理新增文件 + + if updated: + # 保存更新后的 design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info("design.json 已更新") + + def refresh_design(self) -> bool: + """ + 重新生成design.json,基于当前README内容或加载的design.json + 返回bool表示是否成功 + """ + logger.info("开始刷新design.json") + if not self.readme_content: + # 尝试读取README.md文件 + readme_path = self.output_dir / "README.md" + if readme_path.exists(): + try: + self.readme_content = self.parse_readme(readme_path) + except Exception as e: + logger.error(f"读取README.md失败,无法刷新design: {e}") + self.console.print(f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]") + return False + else: + logger.error("没有README内容,且README.md文件不存在,无法刷新design") + self.console.print("[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]") + return False + + try: + self.design = self.generate_design_json() + logger.info("design.json已成功重新生成") + self.console.print("[green]✅ design.json已重新生成[/green]") + return True + except Exception as e: + logger.error(f"重新生成design.json失败: {e}") + self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]") + return False + + def update_file_entry(self, file_path: str, file_content: str) -> bool: + """ + 更新design.json中单个文件的条目,基于提供的文件内容 + 返回bool表示是否成功 + """ + logger.info(f"开始更新design.json中文件条目: {file_path}") + if not self.design: + # 加载现有design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") + return False + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + return False + + # 调用LLM分析文件内容,返回更新信息 + system_prompt = ( + "你是一个软件架构师。分析给定的文件内容,并返回对design.json中该文件条目的更新。" + "返回严格的JSON对象,包含以下字段:\n" + "- summary: 文件的新摘要\n" + "- dependencies: 依赖文件列表\n" + "- functions: 函数列表,每个对象有name, summary, inputs, outputs\n" + "- classes: 类列表,每个对象有name, summary, methods\n" + "注意:仅返回JSON,不要其他文本。" + ) + # 准备当前design.json中该文件的条目信息 + current_entry = None + for f in self.design.files: + if f.path == file_path: + current_entry = f.model_dump() + break + user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目(如果存在):\n{json.dumps(current_entry, indent=2) if current_entry else '无'}" + + try: + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + update_info = result + + # 查找或创建文件条目 + file_model = None + for f in self.design.files: + if f.path == file_path: + file_model = f + break + if file_model is None: + # 创建新条目 + file_model = FileModel( + path=file_path, + summary=update_info.get("summary", ""), + dependencies=update_info.get("dependencies", []), + functions=update_info.get("functions", []), + classes=update_info.get("classes", []) + ) + self.design.files.append(file_model) + logger.info(f"在design.json中创建了新文件条目: {file_path}") + else: + # 更新现有条目 + file_model.summary = update_info.get("summary", file_model.summary) + file_model.dependencies = update_info.get("dependencies", file_model.dependencies) + file_model.functions = update_info.get("functions", file_model.functions) + file_model.classes = update_info.get("classes", file_model.classes) + logger.info(f"更新了design.json中的文件条目: {file_path}") + + # 保存更新后的design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info(f"design.json已更新,文件条目: {file_path}") + self.console.print(f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]") + return True + except Exception as e: + logger.error(f"更新文件条目失败: {e}") + self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]") + return False + + def sync_readme(self) -> bool: + """ + 同步README.md和design.json,确保内容一致性 + 返回bool表示是否成功 + """ + logger.info("开始同步README.md和design.json") + # 读取README.md + readme_path = self.output_dir / "README.md" + if not readme_path.exists(): + logger.error(f"README.md不存在于 {self.output_dir}") + self.console.print(f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]") + return False + try: + with open(readme_path, "r", encoding="utf-8") as f: + readme_content = f.read() + except Exception as e: + logger.error(f"读取README.md失败: {e}") + self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") + return False + + # 加载design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") + return False + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + return False + + # 调用LLM比较和同步 + system_prompt = ( + "你是一个软件架构师。比较README.md内容和design.json,识别不一致之处,并建议更新。" + "返回严格的JSON对象,包含以下字段:\n" + "- needs_update: bool, 是否需要更新\n" + "- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n" + "- updates: 对象,描述具体的更新内容\n" + "注意:仅返回JSON,不要其他文本。" + ) + user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" + + try: + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + needs_update = result.get("needs_update", False) + if not needs_update: + logger.info("README.md和design.json已同步,无需更新") + self.console.print("[green]✅ README.md和design.json已同步,无需更新[/green]") + return True + + update_type = result.get("update_type", "") + updates = result.get("updates", {}) + if update_type == "readme": + # 更新README.md + new_readme = updates.get("new_readme", readme_content) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) + logger.info("已更新README.md") + self.console.print("[green]✅ README.md已更新[/green]") + elif update_type == "design": + # 更新design.json + new_design_data = updates.get("new_design", design.model_dump()) + design = DesignModel(**new_design_data) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) + logger.info("已更新design.json") + self.console.print("[green]✅ design.json已更新[/green]") + elif update_type == "both": + # 更新两者 + new_readme = updates.get("new_readme", readme_content) + new_design_data = updates.get("new_design", design.model_dump()) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) + design = DesignModel(**new_design_data) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) + logger.info("已同步更新README.md和design.json") + self.console.print("[green]✅ README.md和design.json已同步更新[/green]") + else: + logger.warning(f"未知的update_type: {update_type}") + self.console.print(f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]") + return False + return True + except Exception as e: + logger.error(f"同步README.md失败: {e}") + self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") + return False +``` diff --git a/src/llm_codegen/cli.py b/src/llm_codegen/cli.py index 4fddbe7..380478b 100644 --- a/src/llm_codegen/cli.py +++ b/src/llm_codegen/cli.py @@ -13,7 +13,10 @@ from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from loguru import logger -from .core import CodeGenerator +from .core import BaseGenerator +from .init_generator import InitGenerator +from .enhance_generator import EnhanceGenerator +from .fix_generator import FixGenerator from .checker import Checker app = typer.Typer(help="基于LLM的自动化代码生成与维护工具") @@ -71,7 +74,7 @@ def init( console=console, ) as progress: task_id = progress.add_task("正在初始化项目...", total=1) # 修改:设置总任务数为1以控制进度显示 - generator = CodeGenerator( + generator = InitGenerator( api_key=api_key, base_url=base_url, model=model, @@ -142,7 +145,7 @@ def enhance( console=console, ) as progress: task_id = progress.add_task("正在增强项目...", total=1) # 修改:设置总任务数为1以控制进度显示 - generator = CodeGenerator( + generator = EnhanceGenerator( api_key=api_key, base_url=base_url, model=model, @@ -150,7 +153,7 @@ def enhance( log_file=log_file_path, max_concurrency=max_concurrency, ) - success = generator.process_issue(issue_content, issue_type="enhance") + success = generator.process_enhance(issue_file, output_format="full") if success: progress.update(task_id, completed=1, description="增强处理完成") # 修改:成功时更新完成状态 else: @@ -213,7 +216,7 @@ def fix( console=console ) as progress: task_id = progress.add_task("正在修复项目...", total=1) # 修改:设置总任务数为1以控制进度显示 - generator = CodeGenerator( + generator = FixGenerator( api_key=api_key, base_url=base_url, model=model, @@ -221,7 +224,7 @@ def fix( log_file=log_file_path, max_concurrency=max_concurrency, ) - success = generator.process_issue(issue_content, issue_type="fix") + success = generator.process_fix(issue_file, output_format="full") if success: progress.update(task_id, completed=1, description="修复处理完成") # 修改:成功时更新完成状态 else: @@ -285,7 +288,7 @@ def design( console=console, ) as progress: task_id = progress.add_task("正在生成design.json...", total=1) # 可选:保持现有风格,但工单未要求修改此命令 - generator = CodeGenerator( + generator = BaseGenerator( api_key=api_key, base_url=base_url, model=model, @@ -330,7 +333,7 @@ def check( log_file_path = init_logging(output_dir, log_file, command_name="check") try: - generator = CodeGenerator( + generator = BaseGenerator( api_key=api_key, base_url=base_url, model=model, diff --git a/src/llm_codegen/core.py b/src/llm_codegen/core.py index c38ff94..e3d029c 100644 --- a/src/llm_codegen/core.py +++ b/src/llm_codegen/core.py @@ -19,8 +19,8 @@ from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 Fil from .diff_applier import parse_diff, apply_diff -class CodeGenerator: - """代码生成器,封装所有逻辑,支持设计层、断点续写和命令执行""" +class BaseGenerator: + """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" def __init__( self, @@ -80,8 +80,6 @@ class CodeGenerator: 调用LLM并返回解析后的JSON """ logger.debug(f"调用LLM,模型: {self.model}") - logger.debug(f"System: {system_prompt[:200]}...") - logger.debug(f"User: {user_prompt[:200]}...") try: response = self.client.chat.completions.create( @@ -577,370 +575,6 @@ class CodeGenerator: self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") return False - def run(self, readme_path: Path): - """ - 主执行流程,支持基于依赖关系的并发生成 - """ - logger.info("=" * 50) - logger.info("开始代码生成流程") - logger.info(f"README: {readme_path}") - logger.info(f"输出目录: {self.output_dir}") - - # 解析README - self.console.print("[bold yellow]🔍 正在解析README...[/bold yellow]") - try: - self.readme_content = self.parse_readme(readme_path) - except Exception as e: - logger.error(f"解析README失败,无法继续: {e}") - self.console.print(f"[bold red]❌ 解析README失败,无法继续: {e}[/bold red]") - return # 致命错误,退出 - - # 加载状态 - state = self.load_state() - if state: - self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]") - self.state = state - # 从状态恢复设计,假设design.json已存在 - design_path = self.output_dir / "design.json" - if design_path.exists(): - try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) - except Exception as e: - logger.error(f"加载design.json失败: {e}") - self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") - self.console.print("[bold yellow]⚠ design.json损坏,重新生成...[/bold yellow]") - try: - self.design = self.generate_design_json() - except Exception as e2: - logger.error(f"重新生成design.json失败: {e2}") - self.console.print(f"[bold red]❌ 重新生成design.json失败: {e2}[/bold red]") - return - else: - self.console.print("[bold yellow]⚠ design.json不存在,重新生成...[/bold yellow]") - try: - self.design = self.generate_design_json() - except Exception as e: - logger.error(f"生成design.json失败: {e}") - self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]") - return - else: - self.console.print("[bold yellow]📋 正在生成设计文件...[/bold yellow]") - try: - self.design = self.generate_design_json() - self.state = None - except Exception as e: - logger.error(f"生成design.json失败: {e}") - self.console.print(f"[bold red]❌ 生成design.json失败: {e}[/bold red]") - return - - # 获取项目结构 - self.console.print("[bold yellow]📋 正在分析项目结构...[/bold yellow]") - try: - files, dependencies = self.get_project_structure() - except Exception as e: - logger.error(f"获取项目结构失败: {e}") - self.console.print(f"[bold red]❌ 获取项目结构失败: {e}[/bold red]") - return - self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]") - - # 添加隐式依赖 - # dependencies = self._add_implicit_dependencies(files, dependencies) - # logger.info("已添加隐式依赖") - - # 拓扑排序检查依赖关系 - try: - sorted_files = self._topological_sort(files, dependencies) - logger.info(f"拓扑排序成功,文件顺序: {sorted_files}") - except ValueError as e: - logger.error(f"依赖关系错误: {e}") - self.console.print(f"[bold red]❌ 依赖关系错误: {e}[/bold red]") - return # 退出生成 - - # 断点续写:确定已生成文件 - generated_files_set = set(self.state.generated_files if self.state else []) - - # 构建DAG并计算入度 - in_degree = {file: len(dependencies.get(file, [])) for file in files} - # 初始化队列为入度为0且未生成的节点 - queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set]) - processed_files = set(generated_files_set) # 跟踪已处理文件 - remaining_files = set(files) - processed_files - - # 创建进度条 - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - console=self.console, - ) as progress: - self.progress = progress - total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files)) - progress.update(total_task, completed=len(processed_files) - len(generated_files_set)) - - # 初始化文件任务映射 - file_tasks = {} # 局部字典,映射文件到任务ID - - # 并发任务调度 - with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_concurrency) as executor: - futures = {} - while queue or futures: - # 提交队列中的任务 - while queue: - file = queue.popleft() - future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files) - futures[future] = file - # 为每个文件添加独立进度任务并保存任务ID,添加状态显示 - task_id = progress.add_task(f"{file} - {FileStatus.GENERATING}", total=1) # 修改:添加状态 - file_tasks[file] = task_id - - # 等待任意任务完成 - done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0) - for future in done: - file = futures.pop(future) - try: - success, error_msg = future.result() - # 更新文件进度任务,根据状态更新描述 - if file in file_tasks: - if success: - progress.update(file_tasks[file], completed=1, description=f"{file} - {FileStatus.SUCCESS}") # 修改:添加状态 - progress.remove_task(file_tasks[file]) # 移除任务 - else: - # 如果失败,标记为错误状态 - progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态 - progress.remove_task(file_tasks[file]) - del file_tasks[file] # 清理映射 - if success: - processed_files.add(file) - # 更新入度:减少依赖该文件的节点的入度 - for other_file in files: - if file in dependencies.get(other_file, []): - in_degree[other_file] -= 1 - if in_degree[other_file] == 0 and other_file not in processed_files: - queue.append(other_file) - # 保存状态 - self.save_state(list(processed_files), dependencies) - progress.update(total_task, advance=1) # 更新整体进度 - else: - logger.error(f"文件 {file} 生成失败,错误: {error_msg}") - self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]") - # 错误处理:继续处理其他文件,但记录失败 - except Exception as e: - # 捕获 Future 中存储的异常 - logger.error(f"任务 {file} 执行时发生异常: {e}") - self.console.print(f"[bold red]❌ 任务 {file} 执行时发生异常: {e}[/bold red]") - # 将其视为失败 - success = False - error_msg = str(e) - # 然后执行和上面 `else` 分支相同的失败处理逻辑 - if file in file_tasks: - progress.update(file_tasks[file], description=f"{file} - {FileStatus.FAILED}: {error_msg}") # 修改:添加状态 - progress.remove_task(file_tasks[file]) - del file_tasks[file] # 清理映射 - logger.error(f"文件 {file} 生成失败,错误: {error_msg}") - self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]") - # 错误处理:继续处理其他文件,但记录失败 - - - logger.success("所有文件处理完成!") - # 清理状态文件 - if self.state_file.exists(): - try: - self.state_file.unlink() - logger.info("状态文件已清理") - except Exception as e: - logger.error(f"清理状态文件失败: {e}") - self.console.print(f"[bold red]❌ 清理状态文件失败: {e}[/bold red]") - - def process_issue(self, issue_content: str, issue_type: str) -> bool: - """ - 处理需求增强或 Bug 修复工单 - - Args: - issue_content: 工单文件内容(文本) - issue_type: 'enhance' 或 'fix' - - Returns: - bool: 处理是否成功 - """ - logger.info(f"开始处理 {issue_type} 工单") - self.console.print(f"[bold yellow]📋 正在分析 {issue_type} 工单...[/bold yellow]") - - # 加载现有 design.json - design_path = self.output_dir / "design.json" - if not design_path.exists(): - logger.error(f"design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。") - self.console.print(f"[bold red]❌ design.json 不存在于 {self.output_dir},请先运行 init 命令初始化项目。[/bold red]") - return False - - try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) - except Exception as e: - logger.error(f"加载design.json失败: {e}") - self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") - return False - - # 加载 README 内容(如果存在) - readme_path = self.output_dir / "README.md" - if readme_path.exists(): - try: - with open(readme_path, "r", encoding="utf-8") as f: - self.readme_content = f.read() - except Exception as e: - logger.error(f"读取README.md失败: {e}") - self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") - self.readme_content = "" - else: - self.readme_content = "" - - # 步骤1: 分析工单,生成变更计划 - try: - change_plan = self._analyze_issue(issue_content, issue_type) - except Exception as e: - logger.error(f"分析工单失败: {e}") - self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") - return False - if not change_plan: - logger.error("无法生成变更计划") - self.console.print("[bold red]❌ 无法生成变更计划[/bold red]") - return False - - affected_files = change_plan.get("affected_files", []) - if not affected_files: - logger.warning("工单分析结果未指定任何受影响文件") - self.console.print("[yellow]⚠ 工单分析结果未指定任何受影响文件[/yellow]") - return True # 无变更 - - self.console.print(f"[green]✅ 分析完成,将处理 {len(affected_files)} 个文件[/green]") - - # 添加依赖关系排序:解析 design.json 中的依赖,确保依赖项先于被依赖项处理 - # 构建依赖关系字典用于拓扑排序 - dependencies_dict = {} - for file_info in affected_files: - path = file_info["path"] - # 从 design.json 中获取依赖关系 - deps = [] - for f in self.design.files: - if f.path == path: - deps = f.dependencies - break - # 只考虑在 affected_files 中的依赖文件,以确保内部依赖顺序 - affected_paths_set = set(info["path"] for info in affected_files) - filtered_deps = [dep for dep in deps if dep in affected_paths_set] - dependencies_dict[path] = filtered_deps - - # 对 affected_files 进行拓扑排序 - try: - sorted_paths = self._topological_sort([info["path"] for info in affected_files], dependencies_dict) - except ValueError as e: - logger.error(f"依赖关系排序失败: {e}") - self.console.print(f"[bold red]❌ 依赖关系排序失败: {e}[/bold red]") - return False # 排序失败,处理中止 - - # 重新排序 affected_files 基于 sorted_paths - file_info_map = {info["path"]: info for info in affected_files} - sorted_affected_files = [file_info_map[path] for path in sorted_paths] - - # 步骤2: 逐个处理文件(按依赖顺序) - generated_files = [] - for file_info in sorted_affected_files: - file_path = file_info["path"] - action = file_info.get("action", "modify") # modify 或 create - description = file_info.get("description", "") - dependencies = file_info.get("dependencies", []) - - logger.info(f"处理文件: {file_path} (操作: {action})") - - # 读取现有内容(如果是修改) - existing = None - full_path = self.output_dir / file_path - if action == "modify" and full_path.exists(): - try: - with open(full_path, "r", encoding="utf-8") as f: - existing = f.read() - except Exception as e: - logger.error(f"读取文件 {file_path} 失败: {e}") - self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]") - existing = None # 如果读取失败,按新文件处理 - elif action == "create" and full_path.exists(): - logger.warning(f"文件 {file_path} 已存在,将覆盖") - self.console.print(f"[yellow]⚠ 文件 {file_path} 已存在,将覆盖[/yellow]") - existing = None # 创建模式,即使存在也按新文件处理 - - # 收集实际存在的依赖文件 - dep_paths = [] - missing_deps = [] - for dep in dependencies: - dep_full = self.output_dir / dep - if dep_full.exists(): - dep_paths.append(dep) - else: - missing_deps.append(dep) - if missing_deps: - logger.warning(f"依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}") - self.console.print(f"[yellow]⚠ 依赖文件缺失,将不使用这些文件作为上下文: {missing_deps}[/yellow]") - - # 构建生成指令 - instruction = f"请根据工单描述{'修改' if action == 'modify' else '生成'}文件 '{file_path}'.\n" - instruction += f"工单内容摘要:{description}\n" - if action == "modify": - instruction += "请在现有代码基础上进行修改,保持原有风格和功能不变。" - else: - instruction += "请生成完整的代码文件。" - - # 调用 generate_file - code, desc, commands = self.generate_file( - file_path, - instruction, - dep_paths, - existing_content=existing, - output_format="full", - ) - logger.info(f"生成完成: {file_path} - {desc}") - - # 写入文件 - full_path.parent.mkdir(parents=True, exist_ok=True) - try: - with open(full_path, "w", encoding="utf-8") as f: - f.write(code) - logger.info(f"已写入: {full_path}") - generated_files.append(file_path) - except Exception as e: - logger.error(f"写入文件 {file_path} 失败: {e}") - self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]") - # 跳过命令执行 - commands = [] - - # 执行关联命令 - for cmd in commands: - logger.info(f"准备执行命令: {cmd}") - success = self.execute_command(cmd, cwd=self.output_dir) - if not success: - logger.warning(f"命令执行失败,但继续处理: {cmd}") - - # 步骤3: 更新 design.json - if generated_files: - """ - try: - self._update_design(generated_files, change_plan.get("design_updates", {})) - self.console.print("[green]✅ design.json 已更新[/green]") - except Exception as e: - logger.error(f"更新design.json失败: {e}") - self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]") - """ - logger.info(f'change_plan: {change_plan}') - self._update_design(generated_files, change_plan.get("design_updates", {})) - self.console.print("[green]✅ design.json 已更新[/green]") - - - self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]") - return True - - def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: """ 调用 LLM 分析工单,返回结构化变更计划 diff --git a/src/llm_codegen/enhance_generator.py b/src/llm_codegen/enhance_generator.py new file mode 100644 index 0000000..7d6a11d --- /dev/null +++ b/src/llm_codegen/enhance_generator.py @@ -0,0 +1,180 @@ +import json +from pathlib import Path +from typing import Any, Dict, List, Optional +from loguru import logger +from rich.console import Console + +from .core import BaseGenerator + + +class EnhanceGenerator(BaseGenerator): + """ + 增强生成器类,继承自 BaseGenerator,专门处理 enhance 命令逻辑。 + 用于根据需求工单(feature.issue)对现有项目进行功能增强。 + """ + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = "https://api.deepseek.com", + model: str = "deepseek-reasoner", + output_dir: str = "./generated", + log_file: Optional[str] = None, + max_concurrency: int = 4 + ): + """ + 初始化 EnhanceGenerator,继承 BaseGenerator 的配置。 + + Args: + api_key: OpenAI API密钥,默认从环境变量 DEEPSEEK_APIKEY 读取 + base_url: API基础URL + model: 使用的模型 + output_dir: 输出根目录 + log_file: 日志文件路径,默认自动生成 + max_concurrency: 最大并发数 + """ + super().__init__( + api_key=api_key, + base_url=base_url, + model=model, + output_dir=output_dir, + log_file=log_file, + max_concurrency=max_concurrency + ) + logger.info("EnhanceGenerator 初始化完成") + + def process_enhance(self, issue_file_path: Path, output_format: str = "full") -> bool: + """ + 处理 enhance 命令的核心逻辑:读取工单,分析变更,生成或修改文件,更新设计。 + + Args: + issue_file_path: 需求工单文件路径(如 feature.issue) + output_format: 输出格式,'full' 或 'diff',默认为 'full' + + Returns: + bool: 处理是否成功 + """ + logger.info(f"开始处理增强工单: {issue_file_path}") + self.console.print(f"[bold blue]🔧 处理增强工单: {issue_file_path}[/bold blue]") + + # 1. 读取工单文件内容 + try: + with open(issue_file_path, 'r', encoding='utf-8') as f: + issue_content = f.read() + logger.debug(f"工单内容长度: {len(issue_content)} 字符") + except Exception as e: + logger.error(f"读取工单文件失败: {e}") + self.console.print(f"[bold red]❌ 读取工单文件失败: {e}[/bold red]") + return False + + # 2. 调用 LLM 分析工单,获取变更计划 + try: + analysis_result = self._analyze_issue(issue_content, "feature") + except Exception as e: + logger.error(f"分析工单失败: {e}") + self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") + return False + + affected_files = analysis_result.get("affected_files", []) + design_updates = analysis_result.get("design_updates", {}) + + if not affected_files: + logger.warning("工单分析未发现需要变更的文件") + self.console.print("[yellow]⚠ 工单分析未发现需要变更的文件[/yellow]") + return True # 无变更,视为成功 + + logger.info(f"分析到 {len(affected_files)} 个受影响文件") + self.console.print(f"[green]📋 分析到 {len(affected_files)} 个受影响文件[/green]") + + # 3. 对受影响文件进行拓扑排序,基于依赖关系 + file_paths = [af["path"] for af in affected_files] + dependencies = {af["path"]: af.get("dependencies", []) for af in affected_files} + try: + sorted_paths = self._topological_sort(file_paths, dependencies) + logger.debug(f"拓扑排序结果: {sorted_paths}") + except ValueError as e: + logger.error(f"拓扑排序失败,检测到循环依赖: {e}") + self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]") + return False + + # 根据排序顺序获取文件信息 + sorted_file_infos = [] + for path in sorted_paths: + for af in affected_files: + if af["path"] == path: + sorted_file_infos.append(af) + break + + generated_files = [] + for file_info in sorted_file_infos: + file_path = file_info["path"] + action = file_info.get("action", "modify") + description = file_info.get("description", "") + deps = file_info.get("dependencies", []) + + # 4. 根据 action 处理文件:读取现有内容或创建新文件 + existing_content = None + if action == "modify": + full_path = self.output_dir / file_path + if full_path.exists(): + try: + with open(full_path, 'r', encoding='utf-8') as f: + existing_content = f.read() + logger.debug(f"读取现有文件内容: {file_path}") + except Exception as e: + logger.error(f"读取现有文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 读取现有文件 {file_path} 失败: {e}[/bold red]") + # 容错处理,视为创建新文件 + existing_content = "" + action = "create" + else: + logger.warning(f"文件 {file_path} 不存在,将创建新文件") + self.console.print(f"[yellow]⚠ 文件 {file_path} 不存在,将创建新文件[/yellow]") + action = "create" + + # 5. 调用 generate_file 生成或修改文件 + instruction = ( + f"根据需求工单 '{issue_file_path.name}' 和变更描述 '{description}'," + f"{action} 文件 '{file_path}'。请确保符合项目设计。" + ) + try: + code, desc, commands = self.generate_file( + file_path=file_path, + prompt_instruction=instruction, + dependency_files=deps, + existing_content=existing_content, + output_format=output_format + ) + # generate_file 内部已写入文件并执行命令 + generated_files.append(file_path) + logger.info(f"文件处理完成: {file_path} - {desc}") + self.console.print(f"[green]✅ 文件处理完成: {file_path} - {desc}[/green]") + except Exception as e: + logger.error(f"生成文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 生成文件 {file_path} 失败: {e}[/bold red]") + # 继续处理其他文件,但记录失败 + continue + + # 6. 更新 design.json 以反映变更 + if generated_files: + try: + self._update_design(generated_files, design_updates) + logger.info(f"已更新 design.json,包含 {len(generated_files)} 个文件变更") + self.console.print(f"[green]✅ 已更新 design.json[/green]") + except Exception as e: + logger.error(f"更新 design.json 失败: {e}") + self.console.print(f"[bold red]❌ 更新 design.json 失败: {e}[/bold red]") + # 不返回 False,因为文件已生成,仅记录错误 + + # 7. 可选:执行全局命令或检查(如有需要,可从 design.json 读取) + # 此处可根据设计添加,例如运行检查工具 + if self.design and self.design.commands: + logger.info("开始执行项目命令") + for cmd in self.design.commands: + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续: {cmd}") + + logger.info("增强处理流程完成") + self.console.print("[bold green]🎉 增强处理流程完成[/bold green]") + return True diff --git a/src/llm_codegen/fix_generator.py b/src/llm_codegen/fix_generator.py new file mode 100644 index 0000000..8b6f121 --- /dev/null +++ b/src/llm_codegen/fix_generator.py @@ -0,0 +1,146 @@ +import json +from pathlib import Path +from typing import List, Optional +from .core import BaseGenerator +from .models import OutputFormat + + +class FixGenerator(BaseGenerator): + """处理 Bug 修复逻辑的生成器类,继承自 BaseGenerator。""" + + def __init__(self, **kwargs): + """初始化 FixGenerator,继承基类参数。""" + super().__init__(**kwargs) + + def process_fix( + self, + bug_issue_path: Path, + output_format: OutputFormat = OutputFormat.FULL, + ) -> bool: + """ + 处理 fix 命令逻辑:读取 Bug 工单,分析变更,生成并应用修复代码。 + + Args: + bug_issue_path: Bug 工单文件路径(如 bug.issue) + output_format: 输出格式,默认为 FULL,支持 DIFF 用于差异生成 + + Returns: + bool: 修复是否成功 + """ + # 读取 Bug 工单文件内容 + try: + with open(bug_issue_path, "r", encoding="utf-8") as f: + issue_content = f.read() + except Exception as e: + self.logger.error(f"读取 Bug 工单文件失败: {e}") + self.console.print(f"[bold red]❌ 读取 Bug 工单文件失败: {e}[/bold red]") + return False + + # 调用 LLM 分析工单,获取变更计划 + try: + analysis = self._analyze_issue(issue_content, "bug") + except Exception as e: + self.logger.error(f"分析工单失败: {e}") + self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") + return False + + affected_files = analysis.get("affected_files", []) + design_updates = analysis.get("design_updates", {}) + + successful_files = [] + for file_info in affected_files: + file_path = file_info.get("path") + action = file_info.get("action") # 'create' 或 'modify' + description = file_info.get("description", "") + dependencies = file_info.get("dependencies", []) + + if action == "modify": + # 修改现有文件:读取当前内容 + full_path = self.output_dir / file_path + if not full_path.exists(): + self.logger.error(f"文件不存在,无法修改: {file_path}") + self.console.print(f"[bold red]❌ 文件不存在,无法修改: {file_path}[/bold red]") + continue + + try: + with open(full_path, "r", encoding="utf-8") as f: + existing_content = f.read() + except Exception as e: + self.logger.error(f"读取文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 读取文件 {file_path} 失败: {e}[/bold red]") + continue + + # 生成修复代码 + instruction = ( + f"根据 Bug 工单修复文件 '{file_path}'。工单摘要: {issue_content[:200]}..." + ) + code, desc, commands = self.generate_file( + file_path=file_path, + prompt_instruction=instruction, + dependency_files=dependencies, + existing_content=existing_content, + output_format=output_format, + ) + + # 写入修复后的内容 + try: + with open(full_path, "w", encoding="utf-8") as f: + f.write(code) + self.logger.info(f"已修复文件: {file_path} - {desc}") + successful_files.append(file_path) + except Exception as e: + self.logger.error(f"写入文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]") + continue + + # 执行相关命令 + for cmd in commands: + self.execute_command(cmd, cwd=self.output_dir) + + # 更新 design.json 中的文件条目 + self.update_file_entry(file_path, code) + + elif action == "create": + # 创建新文件:无需现有内容 + instruction = ( + f"根据 Bug 工单创建文件 '{file_path}'。工单摘要: {issue_content[:200]}..." + ) + code, desc, commands = self.generate_file( + file_path=file_path, + prompt_instruction=instruction, + dependency_files=dependencies, + existing_content=None, + output_format=output_format, + ) + + # 写入新文件 + full_path = self.output_dir / file_path + full_path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(full_path, "w", encoding="utf-8") as f: + f.write(code) + self.logger.info(f"已创建文件: {file_path} - {desc}") + successful_files.append(file_path) + except Exception as e: + self.logger.error(f"创建文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 创建文件 {file_path} 失败: {e}[/bold red]") + continue + + for cmd in commands: + self.execute_command(cmd, cwd=self.output_dir) + + # 添加新条目到 design.json + self._update_design([file_path], design_updates) + + # 如果有设计更新,整体更新 design.json + if design_updates and successful_files: + self._update_design(successful_files, design_updates) + + if successful_files: + self.logger.info(f"修复成功,处理了 {len(successful_files)} 个文件") + self.console.print(f"[green]✅ 修复成功,处理了 {len(successful_files)} 个文件[/green]") + return True + else: + self.logger.error("修复失败,没有文件被成功处理") + self.console.print("[bold red]❌ 修复失败,没有文件被成功处理[/bold red]") + return False diff --git a/src/llm_codegen/init_generator.py b/src/llm_codegen/init_generator.py new file mode 100644 index 0000000..e4d63c0 --- /dev/null +++ b/src/llm_codegen/init_generator.py @@ -0,0 +1,108 @@ +import json +from pathlib import Path +from typing import Optional +from .core import BaseGenerator +from loguru import logger # 确保日志可用 + + +class InitGenerator(BaseGenerator): + """处理 init 命令的生成器类,继承自 BaseGenerator,用于从 README 初始化项目。""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = "https://api.deepseek.com", + model: str = "deepseek-reasoner", + output_dir: str = "./generated", + log_file: Optional[str] = None, + max_concurrency: int = 4 + ): + """初始化 InitGenerator。 + + Args: + api_key: OpenAI API密钥,默认从环境变量 DEEPSEEK_APIKEY 读取。 + base_url: API基础URL。 + model: 使用的模型。 + output_dir: 输出根目录。 + log_file: 日志文件路径。 + max_concurrency: 最大并发数。 + """ + super().__init__(api_key, base_url, model, output_dir, log_file, max_concurrency) + + def run(self, readme_path: Path) -> None: + """处理 init 命令逻辑:根据 README.md 初始化项目。 + + Args: + readme_path: README 文件路径。 + """ + logger.info(f"开始初始化项目,README路径: {readme_path}") + self.console.print(f"[bold]🚀 开始初始化项目...[/bold]") + + # 1. 读取 README + self.readme_content = self.parse_readme(readme_path) + logger.info("README读取完成") + + # 2. 生成 design.json + self.design = self.generate_design_json() + logger.info("design.json生成完成") + + # 3. 获取文件列表和依赖 + files, dependencies = self.get_project_structure() + logger.info(f"获取到 {len(files)} 个待生成文件") + + # 4. 拓扑排序以确保依赖顺序 + try: + sorted_files = self._topological_sort(files, dependencies) + logger.info(f"拓扑排序完成,文件顺序: {sorted_files}") + except ValueError as e: + logger.error(f"拓扑排序失败: {e}") + self.console.print(f"[bold red]❌ 拓扑排序失败: {e}[/bold red]") + return + + # 5. 生成每个文件 + generated_files = set() + for i, file_path in enumerate(sorted_files, 1): + logger.info(f"正在生成文件 {i}/{len(sorted_files)}: {file_path}") + self.console.print(f"[cyan]生成文件 {i}/{len(sorted_files)}: {file_path}[/cyan]") + + # 准备指令 + instruction = f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + # 收集已生成的依赖文件 + available_deps = [dep for dep in dependencies.get(file_path, []) if dep in generated_files] + + # 生成文件 + code, desc, commands = self.generate_file( + file_path, instruction, available_deps, output_format="full" + ) + logger.info(f"文件生成完成: {file_path} - {desc}") + + # 写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"文件已写入: {output_path}") + + generated_files.add(file_path) + + # 执行关联的命令 + for cmd in commands: + logger.info(f"执行命令: {cmd}") + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续处理: {cmd}") + self.console.print(f"[yellow]⚠ 命令执行失败: {cmd}[/yellow]") + + # 6. 保存状态 + self.save_state(list(generated_files), dependencies) + logger.info("状态已保存") + + # 7. 可选:执行项目级命令,如安装依赖和运行测试 + if self.design.commands: + logger.info("执行项目级命令") + for cmd in self.design.commands: + logger.info(f"执行项目命令: {cmd}") + self.execute_command(cmd, cwd=self.output_dir) + + self.console.print("[green]✅ 项目初始化完成![/green]") + logger.info("项目初始化流程完成")