From 5d541fd3b8cb922c26e5883b6e0fb94685dc708a Mon Sep 17 00:00:00 2001 From: songsenand Date: Fri, 20 Mar 2026 11:52:00 +0800 Subject: [PATCH] =?UTF-8?q?refactor(=E9=A1=B9=E7=9B=AE=E7=BB=93=E6=9E=84):?= =?UTF-8?q?=20=E9=87=8D=E6=9E=84=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E6=8B=86=E5=88=86=E6=A0=B8=E5=BF=83=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E4=B8=BA=E7=8B=AC=E7=AB=8B=E7=94=9F=E6=88=90=E5=99=A8=E4=BB=A5?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 19 +- design.json | 53 +- issues/refactor-split-core.issue | 827 ++++++++++++++++++++++++++- src/llm_codegen/cli.py | 1 - src/llm_codegen/command_executor.py | 51 ++ src/llm_codegen/core.py | 324 ++++------- src/llm_codegen/dependency_sorter.py | 114 ++++ src/llm_codegen/design_manager.py | 106 ++++ src/llm_codegen/file_operations.py | 148 +++++ src/llm_codegen/init_generator.py | 6 +- src/llm_codegen/llm_client.py | 147 +++++ src/llm_codegen/state_manager.py | 78 +++ 12 files changed, 1652 insertions(+), 222 deletions(-) create mode 100644 src/llm_codegen/command_executor.py create mode 100644 src/llm_codegen/dependency_sorter.py create mode 100644 src/llm_codegen/design_manager.py create mode 100644 src/llm_codegen/file_operations.py create mode 100644 src/llm_codegen/llm_client.py create mode 100644 src/llm_codegen/state_manager.py diff --git a/README.md b/README.md index 778c4c1..55adee8 100644 --- a/README.md +++ b/README.md @@ -199,7 +199,6 @@ llm-codegen design project_readme.md -o ./my_design 5. 执行检查与修复。 - ## 📝 工单模板 ### 需求工单 (`feature.issue`) @@ -255,6 +254,24 @@ uv pip install -e ".[dev]" ## 项目结构 +### 工具代码结构 + +本项目(LLM 代码生成工具)的代码结构已重构,核心模块被拆分为多个专门的生成器,以提高模块化和可维护性。主要模块包括: + +- **src/llm_codegen/cli.py**: 命令行接口,使用 Typer 定义命令,分发到相应的生成器。 +- **src/llm_codegen/core.py**: 基础生成逻辑,包含 BaseGenerator 类,提供通用方法如调用 LLM 和文件操作。 +- **src/llm_codegen/init_generator.py**: 初始化命令生成器,处理 `init` 命令逻辑,继承自 BaseGenerator,负责从 README.md 生成完整项目。 +- **src/llm_codegen/enhance_generator.py**: 增强命令生成器,处理 `enhance` 命令逻辑,继承自 BaseGenerator,负责根据需求工单增量添加功能。 +- **src/llm_codegen/fix_generator.py**: 修复命令生成器,处理 `fix` 命令逻辑,继承自 BaseGenerator,负责根据 Bug 工单自动修复缺陷。 +- **src/llm_codegen/design_generator.py**: 设计文件生成器,处理 `design` 命令逻辑,生成中间设计文件 design.json。 +- **src/llm_codegen/utils.py**: 工具函数,如危险命令判断和文件操作。 +- **src/llm_codegen/models.py**: 数据模型,使用 Pydantic 定义数据结构。 +- **src/llm_codegen/diff_applier.py**: 应用代码差异的工具模块(如有)。 + +**设计思想**: 通过将核心逻辑拆分为独立的生成器,每个生成器专注于一个特定任务(初始化、增强、修复、设计),使得代码更易于维护和扩展。BaseGenerator 提供共享功能,减少代码重复,并确保一致性。 + +### 生成的项目结构 + 生成的项目将包含以下文件和目录: ```txt . diff --git a/design.json b/design.json index 01b5600..2213c8b 100644 --- a/design.json +++ b/design.json @@ -3,7 +3,7 @@ "version": "1.0.0", "description": "一个基于大语言模型的智能代码生成与维护工具,支持自动生成、增量添加功能和自动修复Bug。", "files": [ - { + { "path": "README.md", "summary": "项目说明文档,包含项目概述、功能介绍和使用说明", "dependencies": [ @@ -13,7 +13,6 @@ "classes": [], "design_updates": {} }, - { "path": "pyproject.toml", "summary": "项目元数据、依赖配置和脚本入口", @@ -26,7 +25,7 @@ "path": "src/llm_codegen/__init__.py", "summary": "包初始化文件", "dependencies": [ - "src/llm_codegen/core.py" + "src/llm_codegen/core.py" ], "functions": [], "classes": [], @@ -304,6 +303,54 @@ "functions": [], "classes": [], "design_updates": {} + }, + { + "path": "src/llm_codegen/llm_client.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "src/llm_codegen/file_operations.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "src/llm_codegen/command_executor.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "src/llm_codegen/dependency_sorter.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "src/llm_codegen/design_manager.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "src/llm_codegen/state_manager.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} } ], "commands": [ diff --git a/issues/refactor-split-core.issue b/issues/refactor-split-core.issue index d816bc5..fa3bbbf 100644 --- a/issues/refactor-split-core.issue +++ b/issues/refactor-split-core.issue @@ -30,4 +30,829 @@ acceptance_criteria: - README.md 中的“项目结构”部分更新为新文件列表,并简要说明各模块职责。 - 日志记录、进度显示、错误提示等用户体验相关功能保持不变。 - 代码风格符合项目规范(通过 black、pylint 等检查)。 - - 生成对应的单元测试。 \ No newline at end of file + - 生成对应的单元测试。 + + + > 原core.py内容为: + ```python + import json +import os +import subprocess +import sys +import concurrent.futures +import pendulum +from typing import List, Dict, Optional, Any, Tuple +from pathlib import Path +from collections import deque +import threading + +from rich.console import Console +from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID +from loguru import logger +from openai import OpenAI + +from .utils import is_dangerous_command +from .models import ( + DesignModel, + StateModel, + FileModel, + FileStatus, +) # 添加 FileStatus 导入 + + +class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 + """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = "https://api.deepseek.com", + model: str = "deepseek-reasoner", + output_dir: str = "./generated", + log_file: Optional[str] = None, + max_concurrency: int = 4, + ): + """ + 初始化生成器 + + Args: + api_key: OpenAI API密钥,默认从环境变量DEEPSEEK_APIKEY读取 + base_url: API基础URL + model: 使用的模型 + output_dir: 输出根目录 + log_file: 日志文件路径,默认自动生成 + """ + self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY") + if not self.api_key: + raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") + + self.client = OpenAI(api_key=self.api_key, base_url=base_url) + self.model = model + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.state_file = self.output_dir / ".llm_generator_state.json" + self.console = Console() # 添加console实例用于rich打印 + self._state_lock = threading.Lock() + + self.max_concurrency = max_concurrency + + # 配置日志 + if log_file is None: + log_file = self.output_dir / "generator.log" + logger.remove() # 移除默认handler + logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上 + logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG + logger.info(f"日志已初始化,保存至: {log_file}") + + self.readme_content = None + self.design: Optional[DesignModel] = None + self.state: Optional[StateModel] = None + self.progress: Optional[Progress] = None + self.tasks: Dict[str, TaskID] = {} # 任务ID映射 + + def _call_llm( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.2, + expect_json: bool = True, + ) -> Dict[str, Any]: + """ + 调用LLM并返回解析后的JSON + """ + logger.debug(f"调用LLM,模型: {self.model}") + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + temperature=temperature, + response_format={"type": "json_object"} if expect_json else None, + ) + + message = response.choices[0].message + content = message.content + + # 记录思考过程(如果存在) + reasoning_content = None + if hasattr(message, "reasoning_content") and message.reasoning_content: + reasoning_content = message.reasoning_content + logger.info("模型思考过程已记录") + + # 创建响应目录 + responses_dir = self.output_dir / "llm_responses" + responses_dir.mkdir(parents=True, exist_ok=True) + + # 生成文件名(使用当前时间) + timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") + response_file = responses_dir / f"response_{timestamp}.json" + + # 保存响应到JSON文件 + response_data = { + "timestamp": timestamp, + "model": self.model, + "content": content, + "reasoning_content": reasoning_content, + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "temperature": temperature, + "expect_json": expect_json, + } + + with open(response_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, indent=2, ensure_ascii=False) + + logger.debug(f"LLM原始响应: {response_file.name}") + + if expect_json: + result = json.loads(content) + else: + result = {"content": content} + + return result + + except json.JSONDecodeError as e: + logger.error(f"JSON解析失败: {e}") + self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") + raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") + except Exception as e: + logger.error(f"LLM调用失败: {e}") + self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") + raise + + def parse_readme(self, readme_path: Path) -> str: + """ + 读取README文件内容 + """ + logger.info(f"读取README文件: {readme_path}") + try: + with open(readme_path, "r", encoding="utf-8") as f: + content = f.read() + logger.debug(f"README内容长度: {len(content)} 字符") + return content + except Exception as e: + logger.error(f"读取README失败: {e}") + self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]") + raise + + def generate_design_json(self) -> DesignModel: + """ + 调用LLM生成design.json内容,并解析为DesignModel + """ + system_prompt = ( + "你是一个软件架构师。请根据README描述,生成项目的中间设计文件design.json。" + "design.json应包含项目名称、版本、描述、文件列表(含路径、摘要、依赖、函数和类)、建议命令和检查工具。" + "返回严格的 JSON 对象,符合DesignModel结构。" + ) + user_prompt = f"README内容如下:\n\n{self.readme_content}" + + result = self._call_llm(system_prompt, user_prompt) + design_data = result + design = DesignModel(**design_data) + + # 写入design.json文件 + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info(f"已生成design.json: {design_path}") + + return design + + def load_state(self) -> Optional[StateModel]: + """加载断点续写状态""" + if self.state_file.exists(): + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state_data = json.load(f) + self.state = StateModel(**state_data) + logger.info( + f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" + ) + return self.state + except Exception as e: + logger.error(f"加载状态失败: {e}") + self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") + return None + return None + + def save_state( + self, generated_files: List[str], dependencies_map: Dict[str, List[str]] + ) -> None: + """保存断点续写状态,适应并发生成(线程安全)""" + with self._state_lock: # 串行化写入 + state = StateModel( + current_file_index=0, + generated_files=generated_files, + dependencies_map=dependencies_map, + total_files=len(self.design.files) if self.design else 0, + output_dir=str(self.output_dir), + readme_path=self.readme_content[:100] if self.readme_content else "", + ) + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) + logger.debug(f"状态已保存: {self.state_file}") + + def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: + """ + 从design.json获取文件列表和依赖关系 + + Returns: + (files, dependencies) + files: 按顺序需要生成的文件路径列表 + dependencies: 字典 {file: [依赖文件路径]} + """ + if not self.design: + raise ValueError("design.json未加载,请先调用generate_design_json") + + files = [file.path for file in self.design.files] + dependencies = {file.path: file.dependencies for file in self.design.files} + + logger.info(f"从design.json解析到 {len(files)} 个待生成文件") + logger.debug(f"文件列表: {files}") + logger.debug(f"依赖关系: {dependencies}") + + return files, dependencies + + def _add_implicit_dependencies( + self, files: List[str], dependencies: Dict[str, List[str]] + ) -> Dict[str, List[str]]: + """ + 添加隐式依赖关系,基于文件路径和常见模式 + + Args: + files: 文件路径列表 + dependencies: 原始依赖字典 + + Returns: + Dict[str, List[str]]: 增强后的依赖字典 + """ + enhanced = dependencies.copy() + for file in files: + if file not in enhanced: + enhanced[file] = [] + # 添加同一目录下的其他文件作为隐式依赖(简单示例) + path = Path(file) + implicit_deps = [ + f + for f in files + if f != file + and Path(f).parent == path.parent + and f not in enhanced[file] + ] + if implicit_deps: + enhanced[file].extend(implicit_deps) + logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") + return enhanced + + def generate_file( + self, + file_path: str, + prompt_instruction: str, + dependency_files: List[str], + existing_content: Optional[str] = None, + output_format: str = "full", # 新增参数,默认 'full' + ) -> Tuple[str, str, List[str]]: + """ + 生成单个文件,返回 (代码, 描述, 命令列表) + + Args: + file_path: 目标文件路径 + prompt_instruction: 生成指令 + dependency_files: 依赖文件列表(用于上下文) + existing_content: 文件现有内容(若为修改模式) + output_format: 输出格式,'full',来自 models.py + """ + # 收集上下文内容 + context_content = [] + + if self.readme_content: + context_content.append(f"### 项目 README ###\n{self.readme_content}\n") + + # 添加 design.json 上下文 + design_path = self.output_dir / "design.json" + if design_path.exists(): + try: + with open(design_path, "r", encoding="utf-8") as f: + design_content = f.read() + context_content.append( + f"### 设计文件: design.json ###\n{design_content}\n" + ) + except Exception as e: + logger.error(f"读取design.json失败: {e}") + self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") + # 如果design.json读取失败,可能无法继续,但保持上下文为空或部分 + + # 添加依赖文件内容(仅读取存在的文件) + for dep in dependency_files: + dep_path = Path(dep) + if not dep_path.exists(): + alt_path = self.output_dir / dep + if alt_path.exists(): + dep_path = alt_path + else: + logger.warning(f"依赖文件不存在,已跳过: {dep}") + self.console.print( + f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]" + ) + continue + + try: + with open(dep_path, "r", encoding="utf-8") as f: + content = f.read() + context_content.append( + f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" + ) + except Exception as e: + logger.error(f"读取依赖文件 {dep} 失败: {e}") + self.console.print( + f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]" + ) + # 跳过此依赖文件 + + # 如果有现有内容,也加入上下文 + if existing_content is not None: + context_content.append( + f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n" + ) + + full_context = "\n".join(context_content) + + # output_format 为 'full' 或其他,保持现有逻辑 + if existing_content is not None: + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- code: (string) 修改后的完整代码\n" + "- description: (string) 简短的中文修改描述\n" + "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" + ) + else: + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- code: (string) 生成的完整代码\n" + "- description: (string) 简短的中文功能描述\n" + "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" + ) + + user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" + + try: + result = self._call_llm(system_prompt, user_prompt) + code = result.get("code") + description = result.get("description", "") + commands = result.get("commands", []) + result.get("output_format", "full") + if code is None: + raise ValueError("LLM 响应中没有 code 字段") + return code, description, commands + except Exception as e: + logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") + self.console.print( + f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]" + ) + # 返回默认值以便继续 + return "# 生成失败,请检查日志", "生成失败,发生错误", [] + + def _generate_file_task( + self, file_path: str, dependencies: List[str], generated_files: set + ) -> Tuple[bool, str]: + """ + 并发任务函数,用于生成单个文件 + + Args: + file_path: 文件路径 + dependencies: 依赖文件列表 + generated_files: 已生成文件的集合(用于上下文) + + Returns: + Tuple[bool, str]: (是否成功, 错误信息或空字符串) + """ + try: + instruction = ( + f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + ) + # 过滤依赖文件,只使用已生成的 + available_deps = [dep for dep in dependencies if dep in generated_files] + code, desc, commands = self.generate_file( + file_path, instruction, available_deps + ) + logger.info(f"生成完成: {file_path} - {desc}") + + # 写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入: {output_path}") + + # 执行命令 + for cmd in commands: + logger.info(f"准备执行命令: {cmd}") + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续处理: {cmd}") + return True, "" + except Exception as e: + logger.error(f"生成文件 {file_path} 失败: {e}") + return False, str(e) + + def _topological_sort( + self, files: List[str], dependencies: Dict[str, List[str]] + ) -> List[str]: + """ + 对文件列表进行拓扑排序,基于依赖关系。 + 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 + 如果检测到循环依赖,抛出ValueError。 + """ + from collections import deque + + # 初始化入度和反向邻接表 + in_degree = {f: 0 for f in files} + rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f + + # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] + for f in files: + for dep in dependencies.get(f, []): + if dep in files: # 只考虑在files中的依赖 + in_degree[f] += 1 # f依赖于dep,所以f的入度增加 + rev_graph[dep].append(f) # dep被f依赖 + + # 队列初始化为入度为0的文件(无依赖的文件) + queue = deque([f for f in files if in_degree[f] == 0]) + sorted_files = [] + + while queue: + node = queue.popleft() + sorted_files.append(node) + # 所有依赖于node的文件入度减1 + for dependent in rev_graph[node]: + in_degree[dependent] -= 1 + if in_degree[dependent] == 0: + queue.append(dependent) + + # 检查是否所有文件都已排序(无循环依赖) + if len(sorted_files) != len(files): + raise ValueError( + f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" + ) + + return sorted_files + + def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: + """ + 执行单个命令,检查风险,失败仅记录错误不抛出异常 + + Returns: + bool: 命令是否成功执行 + """ + dangerous, reason = is_dangerous_command(cmd) + if dangerous: + logger.error(f"危险命令被阻止: {cmd},原因: {reason}") + self.console.print( + f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" + ) + return False + + logger.info(f"执行命令: {cmd}") + try: + result = subprocess.run( + cmd, + shell=True, + cwd=cwd or self.output_dir, + capture_output=True, + text=True, + timeout=300, # 5分钟超时 + ) + logger.debug(f"命令返回码: {result.returncode}") + if result.stdout: + logger.debug(f"stdout: {result.stdout[:500]}") + if result.stderr: + logger.warning(f"stderr: {result.stderr[:500]}") + if result.returncode != 0: + logger.error(f"命令执行失败,返回码: {result.returncode}") + self.console.print( + f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" + ) + return False + return True + except subprocess.TimeoutExpired: + logger.error(f"命令执行超时: {cmd}") + self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") + return False + except Exception as e: + logger.error(f"命令执行失败: {e}") + self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") + return False + + def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: + """ + 调用 LLM 分析工单,返回结构化变更计划 + """ + system_prompt = ( + "你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件(design.json)," + "分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n" + "- affected_files: 数组,每个元素为一个对象,包含:\n" + " - path: 文件路径(相对于项目根目录)\n" + " - action: 'create' 或 'modify'\n" + " - description: 对此文件变更的简短描述\n" + " - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n" + "- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n" + "注意:仅返回 JSON,不要包含其他文本。" + ) + + # 将现有 design.json 内容作为上下文的一部分 + if not self.design: + design_path = self.output_dir / "design.json" + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + raise e + design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) + user_prompt = ( + f"工单类型: {issue_type}\n" + f"工单内容:\n{issue_content}\n\n" + f"现有设计文件 (design.json):\n{design_str}" + ) + + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + return result + + def _update_design( + self, generated_files: List[str], design_updates: Dict[str, Any] + ): + """ + 根据生成的变更更新 design.json + 使用 FileModel 来处理文件信息 + """ + updated = False + + # 处理新增文件 + for file_path in generated_files: + # 检查文件是否已在 design.files 中 + exists = any(f.path == file_path for f in self.design.files) + if not exists: + # 获取更新信息 + update_info = design_updates.get(file_path, {}) + + # 创建新文件条目(FileModel实例) + new_file = FileModel( + path=file_path, + summary=update_info.get("summary", "自动生成的新文件"), + dependencies=update_info.get("dependencies", []), + functions=update_info.get("functions", []), + classes=update_info.get("classes", []), + design_updates=update_info.get("design_updates", {}), + ) + self.design.files.append(new_file) + updated = True + logger.info(f"已将新文件 {file_path} 添加到 design.json") + + # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) + # 这里可根据实际需求扩展,当前仅处理新增文件 + + if updated: + # 保存更新后的 design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info("design.json 已更新") + + def refresh_design(self) -> bool: + """ + 重新生成design.json,基于当前README内容或加载的design.json + 返回bool表示是否成功 + """ + logger.info("开始刷新design.json") + if not self.readme_content: + # 尝试读取README.md文件 + readme_path = self.output_dir / "README.md" + if readme_path.exists(): + try: + self.readme_content = self.parse_readme(readme_path) + except Exception as e: + logger.error(f"读取README.md失败,无法刷新design: {e}") + self.console.print( + f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]" + ) + return False + else: + logger.error("没有README内容,且README.md文件不存在,无法刷新design") + self.console.print( + "[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]" + ) + return False + + try: + self.design = self.generate_design_json() + logger.info("design.json已成功重新生成") + self.console.print("[green]✅ design.json已重新生成[/green]") + return True + except Exception as e: + logger.error(f"重新生成design.json失败: {e}") + self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]") + return False + + def update_file_entry(self, file_path: str, file_content: str) -> bool: + """ + 更新design.json中单个文件的条目,基于提供的文件内容 + 返回bool表示是否成功 + """ + logger.info(f"开始更新design.json中文件条目: {file_path}") + if not self.design: + # 加载现有design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) + return False + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + return False + + # 调用LLM分析文件内容,返回更新信息,增强以支持design_updates字段 + system_prompt = ( + "你是一个软件架构师。分析给定的文件内容,并返回对design.json中该文件条目的更新。" + "返回严格的JSON对象,包含以下字段:\n" + "- summary: 文件的新摘要\n" + "- dependencies: 依赖文件列表\n" + "- functions: 函数列表,每个对象有name, summary, inputs, outputs\n" + "- classes: 类列表,每个对象有name, summary, methods\n" + "- design_updates: 可选,设计更新字典\n" + "注意:仅返回JSON,不要其他文本。" + ) + # 准备当前design.json中该文件的条目信息 + current_entry = None + for f in self.design.files: + if f.path == file_path: + current_entry = f.model_dump() + break + user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目(如果存在):\n{json.dumps(current_entry, indent=2) if current_entry else '无'}" + + try: + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + update_info = result + + # 查找或创建文件条目 + file_model = None + for f in self.design.files: + if f.path == file_path: + file_model = f + break + if file_model is None: + # 创建新条目,包括design_updates + new_file = FileModel( + path=file_path, + summary=update_info.get("summary", ""), + dependencies=update_info.get("dependencies", []), + functions=update_info.get("functions", []), + classes=update_info.get("classes", []), + design_updates=update_info.get("design_updates", {}), # 新增design_updates处理 + ) + self.design.files.append(new_file) + logger.info(f"在design.json中创建了新文件条目: {file_path}") + else: + # 更新现有条目,使用merge_design_updates处理design_updates + if 'design_updates' in update_info: + file_model.merge_design_updates(update_info['design_updates']) + # 更新其他字段 + file_model.summary = update_info.get("summary", file_model.summary) + file_model.dependencies = update_info.get( + "dependencies", file_model.dependencies + ) + file_model.functions = update_info.get( + "functions", file_model.functions + ) + file_model.classes = update_info.get("classes", file_model.classes) + logger.info(f"更新了design.json中的文件条目: {file_path}") + + # 保存更新后的design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info(f"design.json已更新,文件条目: {file_path}") + self.console.print( + f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" + ) + return True + except Exception as e: + logger.error(f"更新文件条目失败: {e}") + self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]") + return False + + def sync_readme(self) -> bool: + """ + 同步README.md和design.json,确保内容一致性 + 返回bool表示是否成功 + """ + logger.info("开始同步README.md和design.json") + # 读取README.md + readme_path = self.output_dir / "README.md" + if not readme_path.exists(): + logger.error(f"README.md不存在于 {self.output_dir}") + self.console.print( + f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]" + ) + return False + try: + with open(readme_path, "r", encoding="utf-8") as f: + readme_content = f.read() + except Exception as e: + logger.error(f"读取README.md失败: {e}") + self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") + return False + + # 加载design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) + return False + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + return False + + # 调用LLM比较和同步 + system_prompt = ( + "你是一个软件架构师。比较README.md内容和design.json,识别不一致之处,并建议更新。" + "返回严格的JSON对象,包含以下字段:\n" + "- needs_update: bool, 是否需要更新\n" + "- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n" + "- updates: 对象,描述具体的更新内容\n" + "注意:仅返回JSON,不要其他文本。" + ) + user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" + + try: + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + needs_update = result.get("needs_update", False) + if not needs_update: + logger.info("README.md和design.json已同步,无需更新") + self.console.print( + "[green]✅ README.md和design.json已同步,无需更新[/green]" + ) + return True + + update_type = result.get("update_type", "") + updates = result.get("updates", {}) + if update_type == "readme": + # 更新README.md + new_readme = updates.get("new_readme", readme_content) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) + logger.info("已更新README.md") + self.console.print("[green]✅ README.md已更新[/green]") + elif update_type == "design": + # 更新design.json + new_design_data = updates.get("new_design", design.model_dump()) + design = DesignModel(**new_design_data) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) + logger.info("已更新design.json") + self.console.print("[green]✅ design.json已更新[/green]") + elif update_type == "both": + # 更新两者 + new_readme = updates.get("new_readme", readme_content) + new_design_data = updates.get("new_design", design.model_dump()) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) + design = DesignModel(**new_design_data) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) + logger.info("已同步更新README.md和design.json") + self.console.print("[green]✅ README.md和design.json已同步更新[/green]") + else: + logger.warning(f"未知的update_type: {update_type}") + self.console.print( + f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]" + ) + return False + return True + except Exception as e: + logger.error(f"同步README.md失败: {e}") + self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") + return False +``` \ No newline at end of file diff --git a/src/llm_codegen/cli.py b/src/llm_codegen/cli.py index 9172bde..e88afa9 100644 --- a/src/llm_codegen/cli.py +++ b/src/llm_codegen/cli.py @@ -13,7 +13,6 @@ from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from loguru import logger -from .core import BaseGenerator from .init_generator import InitGenerator from .enhance_generator import EnhanceGenerator from .fix_generator import FixGenerator diff --git a/src/llm_codegen/command_executor.py b/src/llm_codegen/command_executor.py new file mode 100644 index 0000000..ada8c5d --- /dev/null +++ b/src/llm_codegen/command_executor.py @@ -0,0 +1,51 @@ +import subprocess +import os +from typing import Optional, Tuple +from loguru import logger +from .utils import is_dangerous_command + + +class CommandExecutor: + """ + 命令执行器,负责执行系统命令并集成危险命令拦截。 + """ + + def __init__(self): + pass + + def execute(self, cmd: str, cwd: Optional[str] = None) -> Tuple[bool, str]: + """ + 执行系统命令,在执行前检查是否危险。 + + Args: + cmd: 要执行的命令字符串。 + cwd: 工作目录路径,如果为 None 则使用当前目录。 + + Returns: + Tuple[bool, str]: (执行是否成功, 输出或错误消息)。 + """ + # 检查命令是否危险 + is_dangerous, reason = is_dangerous_command(cmd) + if is_dangerous: + logger.warning(f"危险命令被拦截: {cmd}, 原因: {reason}") + return False, f"命令危险被拦截: {reason}" + + try: + # 执行命令 + result = subprocess.run( + cmd, + shell=True, # 使用 shell 执行命令字符串 + cwd=cwd, + capture_output=True, + text=True, + encoding='utf-8' + ) + if result.returncode == 0: + logger.info(f"命令执行成功: {cmd}") + return True, result.stdout + else: + logger.error(f"命令执行失败: {cmd}, 错误: {result.stderr}") + return False, result.stderr + except Exception as e: + logger.error(f"执行命令时发生异常: {cmd}, 异常: {e}") + return False, str(e) diff --git a/src/llm_codegen/core.py b/src/llm_codegen/core.py index 95d569d..be7daa4 100644 --- a/src/llm_codegen/core.py +++ b/src/llm_codegen/core.py @@ -14,17 +14,25 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID from loguru import logger from openai import OpenAI -from .utils import is_dangerous_command +from .utils import is_dangerous_command, read_file as utils_read_file, write_file as utils_write_file, ensure_dir, safe_join, log_error, is_fatal_error, build_dependency_graph, compute_in_degrees, topological_sort as utils_topological_sort, create_progress_bar from .models import ( DesignModel, StateModel, FileModel, FileStatus, -) # 添加 FileStatus 导入 + LLMResponse, + OutputFormat +) +from .llm_client import LLMClient +from .file_operations import handle_llm_response, generate_diff, apply_diff +from .command_executor import CommandExecutor +from .dependency_sorter import topological_sort as dependency_topological_sort, detect_cycles +from .design_manager import DesignManager +from .state_manager import StateManager class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 - """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" + """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行,使用组合模式集成模块""" def __init__( self, @@ -49,14 +57,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if not self.api_key: raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") - self.client = OpenAI(api_key=self.api_key, base_url=base_url) - self.model = model self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.state_file = self.output_dir / ".llm_generator_state.json" self.console = Console() # 添加console实例用于rich打印 self._state_lock = threading.Lock() - self.max_concurrency = max_concurrency # 配置日志 @@ -67,6 +72,18 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG logger.info(f"日志已初始化,保存至: {log_file}") + # 初始化模块实例(组合模式) + self.llm_client = LLMClient( + api_key=self.api_key, + model=model, + base_url=base_url if base_url != "https://api.deepseek.com" else None, # LLMClient 处理默认URL + log_level="INFO" + ) + self.command_executor = CommandExecutor() + self.design_manager = DesignManager(design_file_path=str(self.output_dir / "design.json")) + self.state_manager = StateManager(state_file_path=str(self.state_file)) + # file_operations 和 dependency_sorter 作为函数模块直接使用 + self.readme_content = None self.design: Optional[DesignModel] = None self.state: Optional[StateModel] = None @@ -81,66 +98,24 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 expect_json: bool = True, ) -> Dict[str, Any]: """ - 调用LLM并返回解析后的JSON + 调用LLM并返回解析后的JSON,使用 LLMClient 模块 """ - logger.debug(f"调用LLM,模型: {self.model}") - + logger.debug(f"调用LLM,模型: {self.llm_client.model}") try: - response = self.client.chat.completions.create( - model=self.model, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], + llm_response = self.llm_client.call( + system_prompt=system_prompt, + user_prompt=user_prompt, temperature=temperature, - response_format={"type": "json_object"} if expect_json else None, + expect_json=expect_json ) - - message = response.choices[0].message - content = message.content - - # 记录思考过程(如果存在) - reasoning_content = None - if hasattr(message, "reasoning_content") and message.reasoning_content: - reasoning_content = message.reasoning_content - logger.info("模型思考过程已记录") - - # 创建响应目录 - responses_dir = self.output_dir / "llm_responses" - responses_dir.mkdir(parents=True, exist_ok=True) - - # 生成文件名(使用当前时间) - timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") - response_file = responses_dir / f"response_{timestamp}.json" - - # 保存响应到JSON文件 - response_data = { - "timestamp": timestamp, - "model": self.model, - "content": content, - "reasoning_content": reasoning_content, - "system_prompt": system_prompt, - "user_prompt": user_prompt, - "temperature": temperature, - "expect_json": expect_json, + # 转换为字典以保持接口兼容 + result = { + "code": llm_response.code, + "description": llm_response.description, + "commands": llm_response.commands, + "output_format": llm_response.output_format.value } - - with open(response_file, "w", encoding="utf-8") as f: - json.dump(response_data, f, indent=2, ensure_ascii=False) - - logger.debug(f"LLM原始响应: {response_file.name}") - - if expect_json: - result = json.loads(content) - else: - result = {"content": content} - return result - - except json.JSONDecodeError as e: - logger.error(f"JSON解析失败: {e}") - self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") - raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") except Exception as e: logger.error(f"LLM调用失败: {e}") self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") @@ -148,12 +123,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 def parse_readme(self, readme_path: Path) -> str: """ - 读取README文件内容 + 读取README文件内容,使用 file_operations 模块 """ logger.info(f"读取README文件: {readme_path}") try: - with open(readme_path, "r", encoding="utf-8") as f: - content = f.read() + content = utils_read_file(str(readme_path)) logger.debug(f"README内容长度: {len(content)} 字符") return content except Exception as e: @@ -176,35 +150,30 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 design_data = result design = DesignModel(**design_data) - # 写入design.json文件 - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) - logger.info(f"已生成design.json: {design_path}") + # 保存design.json文件 + self.design_manager.save_design(design) + logger.info(f"已生成design.json: {self.design_manager.design_file_path}") return design def load_state(self) -> Optional[StateModel]: - """加载断点续写状态""" - if self.state_file.exists(): - try: - with open(self.state_file, "r", encoding="utf-8") as f: - state_data = json.load(f) - self.state = StateModel(**state_data) + """加载断点续写状态,使用 StateManager 模块""" + try: + self.state = self.state_manager.read_state() + if self.state: logger.info( f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" ) - return self.state - except Exception as e: - logger.error(f"加载状态失败: {e}") - self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") - return None - return None + return self.state + except Exception as e: + logger.error(f"加载状态失败: {e}") + self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") + return None def save_state( self, generated_files: List[str], dependencies_map: Dict[str, List[str]] ) -> None: - """保存断点续写状态,适应并发生成(线程安全)""" + """保存断点续写状态,适应并发生成(线程安全),使用 StateManager 模块""" with self._state_lock: # 串行化写入 state = StateModel( current_file_index=0, @@ -214,8 +183,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 output_dir=str(self.output_dir), readme_path=self.readme_content[:100] if self.readme_content else "", ) - with open(self.state_file, "w", encoding="utf-8") as f: - json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) + self.state_manager.write_state(state) logger.debug(f"状态已保存: {self.state_file}") def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: @@ -228,7 +196,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 dependencies: 字典 {file: [依赖文件路径]} """ if not self.design: - raise ValueError("design.json未加载,请先调用generate_design_json") + # 尝试加载设计 + try: + self.design = self.design_manager.load_design() + except Exception as e: + raise ValueError(f"design.json未加载或加载失败: {e}") files = [file.path for file in self.design.files] dependencies = {file.path: file.dependencies for file in self.design.files} @@ -298,8 +270,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 design_path = self.output_dir / "design.json" if design_path.exists(): try: - with open(design_path, "r", encoding="utf-8") as f: - design_content = f.read() + design_content = utils_read_file(str(design_path)) context_content.append( f"### 设计文件: design.json ###\n{design_content}\n" ) @@ -323,8 +294,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 continue try: - with open(dep_path, "r", encoding="utf-8") as f: - content = f.read() + content = utils_read_file(str(dep_path)) context_content.append( f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" ) @@ -366,13 +336,19 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" try: - result = self._call_llm(system_prompt, user_prompt) - code = result.get("code") - description = result.get("description", "") - commands = result.get("commands", []) - result.get("output_format", "full") - if code is None: - raise ValueError("LLM 响应中没有 code 字段") + llm_response = self.llm_client.call( + system_prompt=system_prompt, + user_prompt=user_prompt, + temperature=0.2, + expect_json=True + ) + code = llm_response.code + description = llm_response.description + commands = llm_response.commands + # 使用 handle_llm_response 处理文件写入,支持不同输出格式 + output_path = str(self.output_dir / file_path) + handle_llm_response(output_path, llm_response) + logger.info(f"文件已生成并写入: {output_path}") return code, description, commands except Exception as e: logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") @@ -407,13 +383,6 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 ) logger.info(f"生成完成: {file_path} - {desc}") - # 写入文件 - output_path = self.output_dir / file_path - output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, "w", encoding="utf-8") as f: - f.write(code) - logger.info(f"已写入: {output_path}") - # 执行命令 for cmd in commands: logger.info(f"准备执行命令: {cmd}") @@ -433,39 +402,16 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 如果检测到循环依赖,抛出ValueError。 """ - from collections import deque - - # 初始化入度和反向邻接表 - in_degree = {f: 0 for f in files} - rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f - - # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] - for f in files: - for dep in dependencies.get(f, []): - if dep in files: # 只考虑在files中的依赖 - in_degree[f] += 1 # f依赖于dep,所以f的入度增加 - rev_graph[dep].append(f) # dep被f依赖 - - # 队列初始化为入度为0的文件(无依赖的文件) - queue = deque([f for f in files if in_degree[f] == 0]) - sorted_files = [] - - while queue: - node = queue.popleft() - sorted_files.append(node) - # 所有依赖于node的文件入度减1 - for dependent in rev_graph[node]: - in_degree[dependent] -= 1 - if in_degree[dependent] == 0: - queue.append(dependent) - - # 检查是否所有文件都已排序(无循环依赖) - if len(sorted_files) != len(files): - raise ValueError( - f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" - ) - - return sorted_files + try: + sorted_files = dependency_topological_sort(dependencies) + # 确保所有文件都在排序列表中 + if set(sorted_files) != set(files): + logger.warning("依赖图可能不完整,调整排序列表") + sorted_files = [f for f in files if f in sorted_files] + [f for f in files if f not in sorted_files] + return sorted_files + except ValueError as e: + logger.error(f"拓扑排序失败: {e}") + raise def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: """ @@ -474,44 +420,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 Returns: bool: 命令是否成功执行 """ - dangerous, reason = is_dangerous_command(cmd) - if dangerous: - logger.error(f"危险命令被阻止: {cmd},原因: {reason}") - self.console.print( - f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" - ) - return False - - logger.info(f"执行命令: {cmd}") - try: - result = subprocess.run( - cmd, - shell=True, - cwd=cwd or self.output_dir, - capture_output=True, - text=True, - timeout=300, # 5分钟超时 - ) - logger.debug(f"命令返回码: {result.returncode}") - if result.stdout: - logger.debug(f"stdout: {result.stdout[:500]}") - if result.stderr: - logger.warning(f"stderr: {result.stderr[:500]}") - if result.returncode != 0: - logger.error(f"命令执行失败,返回码: {result.returncode}") - self.console.print( - f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" - ) - return False - return True - except subprocess.TimeoutExpired: - logger.error(f"命令执行超时: {cmd}") - self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") - return False - except Exception as e: - logger.error(f"命令执行失败: {e}") - self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") - return False + success, output = self.command_executor.execute(cmd, cwd=str(cwd) if cwd else None) + if not success: + logger.error(f"命令执行失败: {cmd}, 输出: {output}") + self.console.print(f"[bold red]❌ 命令执行失败: {cmd}[/bold red]") + return success def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: """ @@ -531,11 +444,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 # 将现有 design.json 内容作为上下文的一部分 if not self.design: - design_path = self.output_dir / "design.json" try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) + self.design = self.design_manager.load_design() except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -585,9 +495,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if updated: # 保存更新后的 design.json - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + self.design_manager.save_design(self.design) logger.info("design.json 已更新") def refresh_design(self) -> bool: @@ -633,17 +541,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.info(f"开始更新design.json中文件条目: {file_path}") if not self.design: # 加载现有design.json - design_path = self.output_dir / "design.json" - if not design_path.exists(): - logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print( - f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" - ) - return False try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) + self.design = self.design_manager.load_design() except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -706,9 +605,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.info(f"更新了design.json中的文件条目: {file_path}") # 保存更新后的design.json - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + self.design_manager.save_design(self.design) logger.info(f"design.json已更新,文件条目: {file_path}") self.console.print( f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" @@ -734,25 +631,15 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 ) return False try: - with open(readme_path, "r", encoding="utf-8") as f: - readme_content = f.read() + readme_content = utils_read_file(str(readme_path)) except Exception as e: logger.error(f"读取README.md失败: {e}") self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") return False # 加载design.json - design_path = self.output_dir / "design.json" - if not design_path.exists(): - logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print( - f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" - ) - return False try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - design = DesignModel(**design_data) + design = self.design_manager.load_design() except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -784,27 +671,23 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if update_type == "readme": # 更新README.md new_readme = updates.get("new_readme", readme_content) - with open(readme_path, "w", encoding="utf-8") as f: - f.write(new_readme) + utils_write_file(str(readme_path), new_readme) logger.info("已更新README.md") self.console.print("[green]✅ README.md已更新[/green]") elif update_type == "design": # 更新design.json new_design_data = updates.get("new_design", design.model_dump()) design = DesignModel(**new_design_data) - with open(design_path, "w", encoding="utf-8") as f: - json.dump(new_design_data, f, indent=2, ensure_ascii=False) + self.design_manager.save_design(design) logger.info("已更新design.json") self.console.print("[green]✅ design.json已更新[/green]") elif update_type == "both": # 更新两者 new_readme = updates.get("new_readme", readme_content) new_design_data = updates.get("new_design", design.model_dump()) - with open(readme_path, "w", encoding="utf-8") as f: - f.write(new_readme) + utils_write_file(str(readme_path), new_readme) design = DesignModel(**new_design_data) - with open(design_path, "w", encoding="utf-8") as f: - json.dump(new_design_data, f, indent=2, ensure_ascii=False) + self.design_manager.save_design(design) logger.info("已同步更新README.md和design.json") self.console.print("[green]✅ README.md和design.json已同步更新[/green]") else: @@ -818,3 +701,18 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.error(f"同步README.md失败: {e}") self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") return False + + def run(self, readme_path: Path) -> None: + """ + 主执行流程,控制整个生成过程(占位符,具体实现取决于子类或调用) + 保持接口不变,内部逻辑可能调整 + """ + # 示例:读取README,生成设计,生成文件 + self.readme_content = self.parse_readme(readme_path) + self.design = self.generate_design_json() + files, dependencies = self.get_project_structure() + # 简化:直接生成所有文件 + for file_path in files: + instruction = f"生成文件 {file_path}" + self.generate_file(file_path, instruction, dependencies.get(file_path, [])) + logger.info("生成完成") diff --git a/src/llm_codegen/dependency_sorter.py b/src/llm_codegen/dependency_sorter.py new file mode 100644 index 0000000..20f8958 --- /dev/null +++ b/src/llm_codegen/dependency_sorter.py @@ -0,0 +1,114 @@ +""" +Module for dependency sorting and cycle detection. +Provides functions to perform topological sort and detect cycles in a dependency graph. +""" + +from collections import defaultdict, deque + + +def topological_sort(dependencies): + """ + Perform topological sort on a dependency graph. + + Args: + dependencies (dict): A dictionary where keys are nodes (e.g., file paths) and values are lists of dependencies. + Example: {"src/llm_codegen/core.py": ["src/llm_codegen/utils.py", "src/llm_codegen/models.py"], ...} + + Returns: + list: A list of nodes in topological order. + Raises: + ValueError: If a cycle is detected in the graph, with details of the cycle. + """ + # Kahn's algorithm for topological sort + graph = defaultdict(list) + in_degree = defaultdict(int) + + # Build graph and compute in-degree + for node, deps in dependencies.items(): + graph[node] = deps[:] # Copy to avoid modification + for dep in deps: + in_degree[dep] += 1 + # Ensure all nodes are included in in_degree + if node not in in_degree: + in_degree[node] = 0 + + # Initialize queue with nodes having zero in-degree + queue = deque([node for node in in_degree if in_degree[node] == 0]) + sorted_nodes = [] + + while queue: + node = queue.popleft() + sorted_nodes.append(node) + for neighbor in graph.get(node, []): + in_degree[neighbor] -= 1 + if in_degree[neighbor] == 0: + queue.append(neighbor) + + # Check for cycles + if len(sorted_nodes) != len(in_degree): + # Cycle detected, find and report it + cycle = _detect_cycle_dfs(dependencies) + raise ValueError(f"Cycle detected in dependency graph: {cycle}") + + return sorted_nodes + + +def _detect_cycle_dfs(dependencies): + """ + Internal helper function to detect a cycle in a dependency graph using DFS. + + Args: + dependencies (dict): Same as topological_sort. + + Returns: + list: A list of nodes forming a cycle if found, else an empty list. + """ + graph = defaultdict(list) + for node, deps in dependencies.items(): + graph[node] = deps[:] + + visited = set() + rec_stack = set() + cycle = [] + + def dfs(node, path): + nonlocal cycle + visited.add(node) + rec_stack.add(node) + path.append(node) + + for neighbor in graph.get(node, []): + if neighbor not in visited: + if dfs(neighbor, path): + return True + elif neighbor in rec_stack: + # Cycle detected, extract from path + start_index = path.index(neighbor) + cycle = path[start_index:] + [neighbor] + return True + + rec_stack.remove(node) + path.pop() + return False + + for node in graph: + if node not in visited: + if dfs(node, []): + return cycle + return [] + + +def detect_cycles(dependencies): + """ + Detect cycles in a dependency graph. + + Args: + dependencies (dict): Same as topological_sort. + + Returns: + tuple: (has_cycle, cycle_nodes), where has_cycle is a boolean, and cycle_nodes is a list if cycle found. + """ + cycle = _detect_cycle_dfs(dependencies) + if cycle: + return True, cycle + return False, [] diff --git a/src/llm_codegen/design_manager.py b/src/llm_codegen/design_manager.py new file mode 100644 index 0000000..14a2a5a --- /dev/null +++ b/src/llm_codegen/design_manager.py @@ -0,0 +1,106 @@ +import json +from pathlib import Path +from typing import Optional, Dict, Any +from .models import DesignModel, FileModel + + +class DesignManager: + """管理 design.json 的加载、保存、更新和同步操作。""" + + def __init__(self, design_file_path: str = "design.json") -> None: + """ + 初始化 DesignManager。 + + 参数: + design_file_path: design.json 文件的路径,默认为当前目录下的 design.json。 + """ + self.design_file_path = Path(design_file_path) + self.design: Optional[DesignModel] = None + + def load_design(self) -> DesignModel: + """ + 从文件加载 design.json 并解析为 DesignModel。 + + 返回: + DesignModel 实例。 + + 异常: + FileNotFoundError: 如果文件不存在。 + ValueError: 如果 JSON 解析失败或模型验证失败。 + """ + if not self.design_file_path.exists(): + raise FileNotFoundError(f"Design file not found: {self.design_file_path}") + + with open(self.design_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + self.design = DesignModel(**data) + return self.design + + def save_design(self, design: Optional[DesignModel] = None) -> None: + """ + 将 DesignModel 保存到 design.json 文件。 + + 参数: + design: 要保存的 DesignModel 实例。如果为 None,则使用 self.design。 + + 异常: + ValueError: 如果 design 为 None 且 self.design 也为 None。 + """ + if design is None: + design = self.design + if design is None: + raise ValueError("No design data to save.") + + with open(self.design_file_path, 'w', encoding='utf-8') as f: + json.dump(design.dict(), f, ensure_ascii=False, indent=2) + + def update_design(self, updates: Dict[str, Any]) -> None: + """ + 更新当前设计数据。 + + 参数: + updates: 一个字典,包含要更新的字段和值,符合 DesignModel 结构。 + 例如,{"project_name": "new_name", "files": [...]}。 + + 注意: + 此方法直接修改 self.design,如果 self.design 为 None,则先加载。 + """ + if self.design is None: + self.load_design() + + # 合并更新到现有设计数据 + current_data = self.design.dict() + current_data.update(updates) + self.design = DesignModel(**current_data) + + def sync_design(self) -> None: + """ + 同步设计数据,合并所有文件的 design_updates 到主设计。 + """ + if self.design is None: + self.load_design() + + for file_model in self.design.files: + if file_model.design_updates: + # 使用文件模型的 merge_design_updates 方法合并更新 + file_model.merge_design_updates(file_model.design_updates) + # 清空 design_updates,表示已同步 + file_model.design_updates = {} + + # 保存同步后的设计 + self.save_design() + + +# 便捷函数 + +def load_design(file_path: str) -> DesignModel: + """便捷函数,加载设计文件。""" + manager = DesignManager(file_path) + return manager.load_design() + + +def save_design(design: DesignModel, file_path: str) -> None: + """便捷函数,保存设计文件。""" + manager = DesignManager(file_path) + manager.save_design(design) diff --git a/src/llm_codegen/file_operations.py b/src/llm_codegen/file_operations.py new file mode 100644 index 0000000..a583c41 --- /dev/null +++ b/src/llm_codegen/file_operations.py @@ -0,0 +1,148 @@ +import os +import pathlib +import difflib +from typing import List, Optional + +from .models import LLMResponse, OutputFormat + + +def read_file(file_path: str) -> str: + """ + 读取文件内容并返回字符串。 + + 参数: + file_path: 文件路径字符串。 + + 返回: + 文件内容的字符串。 + + 异常: + FileNotFoundError: 如果文件不存在。 + UnicodeDecodeError: 如果编码问题。 + """ + path = pathlib.Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {file_path}") + return path.read_text(encoding='utf-8') + + +def write_file(file_path: str, content: str) -> None: + """ + 写入内容到文件,如果目录不存在则创建。 + + 参数: + file_path: 文件路径字符串。 + content: 要写入的内容字符串。 + """ + path = pathlib.Path(file_path) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding='utf-8') + + +def ensure_directory_exists(dir_path: str) -> None: + """ + 确保目录存在,如果不存在则创建。 + + 参数: + dir_path: 目录路径字符串。 + """ + path = pathlib.Path(dir_path) + path.mkdir(parents=True, exist_ok=True) + + +def generate_diff(old_content: str, new_content: str) -> str: + """ + 生成 unified diff 字符串,比较旧内容和新内容。 + + 参数: + old_content: 旧内容字符串。 + new_content: 新内容字符串。 + + 返回: + unified diff 格式的字符串。 + """ + old_lines = old_content.splitlines(keepends=True) + new_lines = new_content.splitlines(keepends=True) + diff = difflib.unified_diff(old_lines, new_lines, fromfile='old', tofile='new') + return ''.join(diff) + + +def apply_diff(file_path: str, diff_content: str) -> None: + """ + 应用 unified diff 到文件,假设 diff_content 是有效的 diff 字符串。 + 这是一个简化实现,可能不处理所有 diff 情况。 + + 参数: + file_path: 要应用差异的文件路径字符串。 + diff_content: unified diff 格式的字符串。 + + 异常: + FileNotFoundError: 如果文件不存在。 + ValueError: 如果 diff 无法应用。 + """ + # 读取当前文件内容 + current_content = read_file(file_path) + current_lines = current_content.splitlines(keepends=True) + diff_lines = diff_content.splitlines(keepends=True) + + # 解析 diff 并生成新内容 + new_lines = [] + i = 0 + j = 0 # 索引用于 current_lines + while i < len(diff_lines): + line = diff_lines[i] + if line.startswith('---') or line.startswith('+++'): + i += 1 + continue + elif line.startswith('@@'): + # 解析范围信息,简化跳过 + i += 1 + continue + elif line.startswith(' '): + # 上下文行,应该匹配当前内容 + if j < len(current_lines) and current_lines[j] == line[1:]: + new_lines.append(current_lines[j]) + j += 1 + i += 1 + else: + raise ValueError(f"Patch does not apply at line {i}: context mismatch") + elif line.startswith('-'): + # 删除行,跳过当前内容中的对应行 + if j < len(current_lines) and current_lines[j] == line[1:]: + j += 1 + else: + raise ValueError(f"Patch does not apply at line {i}: deletion mismatch") + i += 1 + elif line.startswith('+'): + # 添加行 + new_lines.append(line[1:]) + i += 1 + else: + i += 1 + + # 添加剩余当前内容(如果有) + while j < len(current_lines): + new_lines.append(current_lines[j]) + j += 1 + + new_content = ''.join(new_lines) + write_file(file_path, new_content) + + +def handle_llm_response(file_path: str, response: LLMResponse) -> None: + """ + 处理 LLM 响应,根据 output_format 写入完整代码或应用差异。 + + 参数: + file_path: 目标文件路径字符串。 + response: LLMResponse 实例,包含代码、描述、命令和输出格式。 + + 异常: + ValueError: 如果不支持的 output_format。 + """ + if response.output_format == OutputFormat.FULL: + write_file(file_path, response.code) + elif response.output_format == OutputFormat.DIFF: + apply_diff(file_path, response.code) + else: + raise ValueError(f"Unsupported output format: {response.output_format}") diff --git a/src/llm_codegen/init_generator.py b/src/llm_codegen/init_generator.py index e4d63c0..4baea62 100644 --- a/src/llm_codegen/init_generator.py +++ b/src/llm_codegen/init_generator.py @@ -1,12 +1,12 @@ import json from pathlib import Path from typing import Optional -from .core import BaseGenerator +from .core import CodeGenerator from loguru import logger # 确保日志可用 -class InitGenerator(BaseGenerator): - """处理 init 命令的生成器类,继承自 BaseGenerator,用于从 README 初始化项目。""" +class InitGenerator(CodeGenerator): + """处理 init 命令的生成器类,继承自 CodeGenerator,用于从 README 初始化项目。""" def __init__( self, diff --git a/src/llm_codegen/llm_client.py b/src/llm_codegen/llm_client.py new file mode 100644 index 0000000..653a5d6 --- /dev/null +++ b/src/llm_codegen/llm_client.py @@ -0,0 +1,147 @@ +import json +import logging +from datetime import datetime +from typing import Optional + +import openai +from pydantic import BaseModel + +from .models import LLMResponse + + +class LLMClient: + """LLM客户端类,封装API调用、响应保存和思考过程记录。""" + + def __init__( + self, + api_key: str, + model: str = "gpt-3.5-turbo", + base_url: Optional[str] = None, + log_level: str = "INFO", + ): + """ + 初始化LLM客户端。 + + 参数: + api_key: LLM API密钥。 + model: 使用的模型,默认为"gpt-3.5-turbo"。 + base_url: API基础URL,可选,用于自定义端点。 + log_level: 日志级别,默认为"INFO"。 + """ + self.api_key = api_key + self.model = model + self.base_url = base_url + # 初始化OpenAI客户端 + if self.base_url: + self.client = openai.OpenAI(api_key=api_key, base_url=base_url) + else: + self.client = openai.OpenAI(api_key=api_key) + # 配置日志 + self.logger = logging.getLogger(__name__) + self.logger.setLevel(getattr(logging, log_level.upper())) + if not self.logger.handlers: + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def call( + self, + system_prompt: str, + user_prompt: str, + temperature: float = 0.7, + expect_json: bool = True, + ) -> LLMResponse: + """ + 调用LLM并返回解析后的响应。 + + 参数: + system_prompt: 系统提示。 + user_prompt: 用户提示。 + temperature: 温度参数,控制随机性。 + expect_json: 是否期望JSON响应,默认为True。 + + 返回: + LLMResponse对象。 + + 异常: + 抛出任何调用或解析错误。 + """ + # 记录思考过程开始 + self.logger.info("Starting LLM call.") + self.logger.debug(f"System prompt (first 100 chars): {system_prompt[:100]}") + self.logger.debug(f"User prompt (first 100 chars): {user_prompt[:100]}") + self.logger.debug(f"Temperature: {temperature}, Expect JSON: {expect_json}") + + try: + # 构建请求消息 + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] + # 调用LLM API + response = self.client.chat.completions.create( + model=self.model, + messages=messages, + temperature=temperature, + response_format={"type": "json_object"} if expect_json else None, + ) + content = response.choices[0].message.content + self.logger.info("LLM call successful.") + + # 解析响应 + if expect_json: + data = json.loads(content) + llm_response = LLMResponse(**data) + else: + # 如果不是JSON,创建默认LLMResponse + llm_response = LLMResponse( + code=content, + description="Generated from non-JSON response", + commands=[], + output_format="full", + ) + + # 记录思考过程结束 + self.logger.info(f"Response parsed: {llm_response}") + + # 自动保存响应 + self.save_response(llm_response, system_prompt, user_prompt) + + return llm_response + except Exception as e: + self.logger.error(f"LLM call failed: {e}") + raise + + def save_response( + self, + response: LLMResponse, + system_prompt: str, + user_prompt: str, + file_path: Optional[str] = None, + ): + """ + 保存LLM响应到文件。 + + 参数: + response: LLMResponse对象。 + system_prompt: 系统提示。 + user_prompt: 用户提示。 + file_path: 文件路径,可选。如果未提供,使用默认路径。 + """ + if file_path is None: + # 默认保存到logs目录 + import os + os.makedirs("logs", exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + file_path = f"logs/llm_response_{timestamp}.json" + + data = { + "timestamp": datetime.now().isoformat(), + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "response": response.dict(), + } + with open(file_path, 'w') as f: + json.dump(data, f, indent=2) + self.logger.info(f"Response saved to {file_path}") diff --git a/src/llm_codegen/state_manager.py b/src/llm_codegen/state_manager.py new file mode 100644 index 0000000..02afa95 --- /dev/null +++ b/src/llm_codegen/state_manager.py @@ -0,0 +1,78 @@ +''' +状态管理器,用于管理断点续写状态文件的读写(线程安全)。 +''' +import json +from pathlib import Path +from threading import Lock +from typing import Optional + +from .models import StateModel + + +class StateManager: + '''状态管理器类,提供线程安全的状态文件读写操作。''' + + def __init__(self, state_file_path: str): + ''' + 初始化状态管理器。 + + 参数: + state_file_path: 状态文件的路径。 + ''' + self.state_file_path = Path(state_file_path) + self.lock = Lock() + + def read_state(self) -> Optional[StateModel]: + ''' + 从文件读取状态。 + + 返回: + StateModel 实例,如果文件不存在则返回 None。 + ''' + with self.lock: + if not self.state_file_path.exists(): + return None + try: + with open(self.state_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + return StateModel(**data) + except (json.JSONDecodeError, IOError) as e: + raise RuntimeError(f"Failed to read state file: {e}") from e + + def write_state(self, state: StateModel) -> None: + ''' + 将状态写入文件。 + + 参数: + state: StateModel 实例。 + ''' + with self.lock: + try: + with open(self.state_file_path, 'w', encoding='utf-8') as f: + json.dump(state.dict(), f, indent=2, ensure_ascii=False) + except IOError as e: + raise RuntimeError(f"Failed to write state file: {e}") from e + + def initialize_state(self, total_files: int, output_dir: str, readme_path: str) -> StateModel: + ''' + 初始化状态并保存到文件。 + + 参数: + total_files: 总文件数。 + output_dir: 输出目录。 + readme_path: README 文件路径。 + + 返回: + 初始化的 StateModel 实例。 + ''' + state = StateModel( + current_file_index=0, + generated_files=[], + dependencies_map={}, + file_statuses={}, + total_files=total_files, + output_dir=output_dir, + readme_path=readme_path + ) + self.write_state(state) + return state