diff --git a/README.md b/README.md index 49e4e31..778c4c1 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,6 @@ - 🐞 **自动 Bug 修复**:通过编写**Bug 工单**(如 `bug.issue`),描述问题现象,工具结合代码和错误信息生成修复补丁。 - 🔧 **命令执行**:生成文件后可自动执行建议命令(如安装依赖、运行构建),内置危险命令拦截(执行命令失败不会终止任务,仅记录错误)。 - ✅ **单元测试**:使用 `pytest` 编写测试用例,支持测试覆盖率统计。 -- 🔍 **并行检查**:生成代码后并行运行多个检查工具(`pylint`、`mypy`、`black` 等),收集错误信息。 -- 🔄 **自修复**:将检查错误、README、design.json 和相关代码提交给 LLM,自动生成修复补丁并应用。 - ⏯️ **断点续写**:生成过程中断后可自动从上次中断处继续,状态保存在 `.llm_generator_state.json`。 - 🖥️ **命令行工具**:提供 `llm-codegen` 命令,支持多种操作模式。 - 📝 **详细日志**:所有操作、LLM 响应、错误均通过 `loguru` 记录到文件。 @@ -200,52 +198,7 @@ llm-codegen design project_readme.md -o ./my_design 4. 应用变更,更新 `design.json` 中的摘要(如果新增了函数/类)。 5. 执行检查与修复。 -## 📊 Diff 输出格式支持 -工具支持在生成代码变更时输出 diff 格式,便于代码审查和集成到版本控制系统。diff 输出以标准 unified diff 格式呈现,适用于 `enhance` 和 `fix` 操作。 - -### 字段描述 - -diff 输出包含以下关键字段: - -- **文件名**:变更的文件路径,如 `src/llm_codegen/core.py`。 -- **行号**:变更发生的行号范围,使用 `@@` 标记表示。 -- **旧代码**:被修改或删除的代码行,以 `-` 开头。 -- **新代码**:新增或修改后的代码行,以 `+` 开头。 -- **变更类型**:隐含在 diff 中,如添加(只有 `+` 行)、删除(只有 `-` 行)、修改(同时有 `-` 和 `+` 行)。 - -### 使用示例 - -运行 `llm-codegen enhance` 或 `llm-codegen fix` 时,通过 `--diff` 选项启用 diff 输出。例如: - -```bash -llm-codegen enhance feature.issue -o ./project --diff -``` - -输出示例: - -```diff ---- a/src/llm_codegen/core.py -+++ b/src/llm_codegen/core.py -@@ -10,7 +10,7 @@ - def generate_file(self, file_path, prompt_instruction, dependency_files): - # 生成代码逻辑 - code = self._call_llm(...) -- commands = [] -+ commands = ["安装依赖"] - return code, commands -``` - -### 注意事项 - -- diff 输出功能仅适用于增强(`enhance`)和修复(`fix`)操作,初始化(`init`)操作不产生 diff,因为它是从头生成。 -- 确保使用支持 diff 格式的工具(如 `git diff`、`diff` 命令)查看和应用变更。 -- 如果不需要 diff 输出,可以省略 `--diff` 选项,工具将直接应用变更到文件。 -- diff 输出不影响工具的核心功能,仅为可选辅助特性。 - -### 内部实现 - -工具在生成 diff 输出后,内部使用 `src/llm_codegen/diff_applier.py` 模块来解析和应用 diff 到代码文件。该模块负责读取 diff 格式,验证变更,并安全地更新文件。开发者可以查看此模块的代码以了解更多细节,例如如何与 LLM 响应集成和确保变更的正确性。 ## 📝 工单模板 @@ -311,11 +264,12 @@ uv pip install -e ".[dev]" │ └── llm_codegen/ # 主代码包 │ ├── __init__.py │ ├── cli.py # 命令行入口(typer) -│ ├── core.py # 核心生成逻辑(CodeGenerator 类) -│ ├── checker.py # 并行检查与修复模块 +│ ├── core.py # 核心生成逻辑(BaseGenerator 类) +│ ├── enhance_generator.py +│ ├── fix_generator.py +│ ├── init_generator.py │ ├── utils.py # 工具函数(危险命令判断、文件操作) │ └── models.py # 数据模型(Pydantic) -│ └── diff_applier.py # 应用llm返回的diff ├── tests/ # 单元测试 │ ├── __init__.py │ ├── test_cli.py diff --git a/design.json b/design.json index af53428..01b5600 100644 --- a/design.json +++ b/design.json @@ -3,6 +3,17 @@ "version": "1.0.0", "description": "一个基于大语言模型的智能代码生成与维护工具,支持自动生成、增量添加功能和自动修复Bug。", "files": [ + { + "path": "README.md", + "summary": "项目说明文档,包含项目概述、功能介绍和使用说明", + "dependencies": [ + "src/llm_codegen/cli.py" + ], + "functions": [], + "classes": [], + "design_updates": {} + }, + { "path": "pyproject.toml", "summary": "项目元数据、依赖配置和脚本入口", @@ -14,7 +25,9 @@ { "path": "src/llm_codegen/__init__.py", "summary": "包初始化文件", - "dependencies": [], + "dependencies": [ + "src/llm_codegen/core.py" + ], "functions": [], "classes": [], "design_updates": {} @@ -42,7 +55,6 @@ "summary": "核心生成逻辑,包含CodeGenerator类", "dependencies": [ "src/llm_codegen/utils.py", - "src/llm_codegen/diff_applier.py", "src/llm_codegen/models.py" ], "functions": [ @@ -127,17 +139,6 @@ ], "design_updates": {} }, - { - "path": "src/llm_codegen/checker.py", - "summary": "并行检查与修复模块,运行检查工具并收集错误", - "dependencies": [ - "src/llm_codegen/core.py", - "src/llm_codegen/models.py" - ], - "functions": [], - "classes": [], - "design_updates": {} - }, { "path": "src/llm_codegen/utils.py", "summary": "工具函数,如危险命令判断和文件操作", @@ -287,6 +288,22 @@ } ], "design_updates": {} + }, + { + "path": "src/llm_codegen/design_generator.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} + }, + { + "path": "tests/test_design_generator.py", + "summary": "自动生成的新文件", + "dependencies": [], + "functions": [], + "classes": [], + "design_updates": {} } ], "commands": [ diff --git a/issues/redesign-design-command.issue b/issues/redesign-design-command.issue new file mode 100644 index 0000000..e79afb4 --- /dev/null +++ b/issues/redesign-design-command.issue @@ -0,0 +1,49 @@ +name: redesign-design-command.issue +description: | + 当前 `llm-codegen design` 子命令仅能根据 README 文件生成 `design.json`,功能单一,无法满足项目演进过程中对设计文件进行增量维护的需求。 + 在实际开发中,开发者可能手动修改了代码文件,或者通过其他工具生成了代码,需要将这些变更同步回 `design.json` 以保持设计文档与代码的一致性。 + 因此,需要重新设计 `design` 子命令,使其具备以下能力: + - 支持指定一个或多个源文件(Python 文件)作为输入,分析其内容并更新 `design.json` 中对应文件的条目(如摘要、依赖、函数列表、类列表等)。 + - 如果指定的源文件在 `design.json` 中不存在,则自动添加新的文件条目。 + - 如果未指定任何源文件,则默认仍然可以基于 README 生成初始 `design.json`(即保留原有功能)。 + - 支持 `--force` 选项以强制覆盖现有条目,而不是合并更新。 + - 提供 `--dry-run` 选项预览将要进行的更改而不实际写入文件。 + - 所有分析过程仍通过 LLM 调用完成,确保生成的摘要、函数/类描述准确反映代码内容。 + - 更新后的 `design.json` 应保持原有结构,且格式良好(缩进等)。 + + 这一改进将使 `design` 命令成为维护项目设计文档的实用工具,而不仅仅是初始化工具。 + +affected_files: + - src/llm_codegen/cli.py + - src/llm_codegen/core.py + - src/llm_codegen/design_generator.py # 新建 + - src/llm_codegen/models.py # 可能调整 FileModel 或新增方法 + +acceptance_criteria: + - 新增命令行选项: + - `--source` / `-s`:可重复使用,指定一个或多个源文件路径(相对于项目根目录)。 + - `--force`:强制覆盖现有条目,而不是合并(合并策略见下文)。 + - `--dry-run`:仅显示将要做的更改,不实际写入文件。 + - 如果同时指定 `--source` 和 `--file`(原来的 README 参数),则行为需明确定义(例如先基于 README 生成框架,再根据源文件更新,或报错提示只能使用一种模式)。 + - 当指定 `--source` 时,程序应: + 1. 读取现有的 `design.json`(如果存在),否则视为空设计。 + 2. 对于每个源文件,读取其内容。 + 3. 调用 LLM 分析该文件内容,生成该文件的 `FileModel` 条目(包括 `summary`、`dependencies`、`functions`、`classes`)。 + 4. 将生成的条目与现有条目合并(除非使用 `--force`): + - 合并规则:如果现有条目存在,则保留原有字段,仅更新 LLM 提供的字段(例如,如果 LLM 只返回了 `functions`,则只更新 `functions`,保留原有的 `summary` 和 `dependencies`;如果 LLM 返回了完整信息,则全部更新)。建议在 prompt 中要求 LLM 返回完整的文件条目,但由程序决定合并逻辑。 + - 如果使用 `--force`,则直接替换整个文件条目。 + 5. 如果文件在 `design.json` 中不存在,则直接添加新条目。 + 6. 所有文件处理完成后,保存更新后的 `design.json`。 + - 当未指定 `--source` 时,行为与旧版一致:根据 README 生成完整的 `design.json`(覆盖原有)。 + - 为了保证 LLM 分析的质量,system prompt 应明确要求返回符合 `FileModel` 结构的 JSON,并提供示例。 + - 更新 `cli.py` 中的 `design` 命令,添加上述选项,并调用相应的后端逻辑。 + - 在 `core.py` 中现有 `update_file_entry` 方法可被复用或增强,但建议将设计相关的逻辑迁移到新的 `DesignGenerator` 类中(或直接在 `core.py` 中扩展),以保持代码清晰。 + - 添加单元测试覆盖以下场景: + - 从单个源文件更新现有条目。 + - 从多个源文件同时更新(包括新增和修改)。 + - 使用 `--force` 覆盖已有条目。 + - 使用 `--dry-run` 不实际写入。 + - 未指定 `--source` 时仍能基于 README 生成。 + - 处理不存在的源文件时给出友好错误提示。 + - 确保与现有 `enhance`、`fix` 命令兼容,不会影响它们的正常功能。 + - 所有代码通过 lint 和类型检查,日志记录完整。 \ No newline at end of file diff --git a/issues/refactor-split-core.issue b/issues/refactor-split-core.issue new file mode 100644 index 0000000..d816bc5 --- /dev/null +++ b/issues/refactor-split-core.issue @@ -0,0 +1,33 @@ +# 需求工单:拆分 core.py 为多个专注模块 +name: 重构 core.py,拆分为多个单一职责的模块 +description: | + 当前 core.py 文件过于庞大,包含 LLM 调用、文件操作、命令执行、依赖排序、状态管理、设计文件维护等多个职责,导致代码难以维护和测试。需要将其拆分为多个独立的模块,每个模块负责一个清晰的功能领域,并通过组合方式在 BaseGenerator 中集成。 + + 主要拆分目标: + - 创建 `llm_client.py`:封装 LLM API 调用、响应保存和思考过程记录。 + - 创建 `file_operations.py`:处理文件读写、目录创建和 diff 应用。 + - 创建 `command_executor.py`:执行系统命令,集成危险命令拦截。 + - 创建 `dependency_sorter.py`:提供依赖关系的拓扑排序及循环检测。 + - 创建 `design_manager.py`:管理 design.json 的加载、保存、更新及同步操作。 + - 创建 `state_manager.py`:管理断点续写状态文件的读写(线程安全)。 + - 精简 `core.py` 中的 BaseGenerator,使其组合以上组件,保留对外接口不变。 + + 同时需要更新 README.md,反映新的模块结构和设计思想。 +affected_files: + - src/llm_codegen/core.py + - src/llm_codegen/llm_client.py # 新增 + - src/llm_codegen/file_operations.py # 新增 + - src/llm_codegen/command_executor.py # 新增 + - src/llm_codegen/dependency_sorter.py # 新增 + - src/llm_codegen/design_manager.py # 新增 + - src/llm_codegen/state_manager.py # 新增 + - README.md +acceptance_criteria: + - 所有原有功能(init、enhance、fix、design 子命令)在重构后行为完全一致,不引入新 bug。 + - core.py 中的 _call_llm、_topological_sort、文件读写、命令执行、状态保存、design 操作等逻辑均迁移至对应新模块,BaseGenerator 仅保留组合与高层流程。 + - 新模块职责单一,相互之间通过明确的接口调用,无循环依赖。 + - 单元测试覆盖核心功能,且原有测试用例全部通过。 + - README.md 中的“项目结构”部分更新为新文件列表,并简要说明各模块职责。 + - 日志记录、进度显示、错误提示等用户体验相关功能保持不变。 + - 代码风格符合项目规范(通过 black、pylint 等检查)。 + - 生成对应的单元测试。 \ No newline at end of file diff --git a/src/llm_codegen/__init__.py b/src/llm_codegen/__init__.py index 73b5ba2..6b5d704 100644 --- a/src/llm_codegen/__init__.py +++ b/src/llm_codegen/__init__.py @@ -9,7 +9,7 @@ __version__ = "1.0.0" __description__ = "一个基于大语言模型的智能代码生成与维护工具" # 导出核心模块以便从包级别导入 -from .core import CodeGenerator +# from .core import CodeGenerator # from .cli import main -__all__ = ["CodeGenerator", "__version__", "__description__"] +__all__ = ["__version__", "__description__"] diff --git a/src/llm_codegen/checker.py b/src/llm_codegen/checker.py deleted file mode 100644 index a3e3a9a..0000000 --- a/src/llm_codegen/checker.py +++ /dev/null @@ -1,499 +0,0 @@ -import json -import subprocess -import sys -from typing import List, Dict, Optional, Tuple, Any -from pathlib import Path -from concurrent.futures import ThreadPoolExecutor, as_completed -import os -import warnings - -from loguru import logger -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn -from .core import CodeGenerator - -# 尝试导入 pathspec(用于精确解析 .gitignore) -try: - from pathspec import PathSpec - from pathspec.patterns import GitWildMatchPattern - HAS_PATHSPEC = True -except ImportError: - HAS_PATHSPEC = False - import fnmatch - warnings.warn( - "pathspec 未安装,将使用简单的通配符匹配处理 .gitignore(可能不完全准确)。" - "建议安装:pip install pathspec" - ) - - -class Checker: - """ - 并行检查与修复模块,运行检查工具(默认 black)并收集错误, - 支持自动调用 LLM 生成修复补丁。 - """ - - def __init__( - self, - output_dir: Path, - check_tools: Optional[List[str]] = None, - code_generator: Optional[CodeGenerator] = None, - api_key: Optional[str] = None, - base_url: str = "https://api.deepseek.com", - model: str = "deepseek-reasoner", - ): - """ - 初始化检查器 - - Args: - output_dir: 项目输出目录,用于查找代码文件和保存检查结果 - check_tools: 检查工具列表,默认为 ["black"]。若传入多个工具,仅使用第一个 - code_generator: CodeGenerator 实例,用于调用 LLM - api_key, base_url, model: 用于创建 CodeGenerator(当 code_generator 为 None 时) - """ - self.output_dir = Path(output_dir) - - # 处理检查工具:默认 black,若传入多个则取第一个并警告 - if check_tools is None: - self.check_tools = ["black"] - else: - if len(check_tools) > 1: - logger.warning( - f"检测到多个检查工具 {check_tools},将只使用第一个:{check_tools[0]}" - ) - self.check_tools = [check_tools[0]] if check_tools else ["black"] - - if code_generator: - self.code_generator = code_generator - else: - self.code_generator = CodeGenerator( - api_key=api_key, - base_url=base_url, - model=model, - output_dir=str(self.output_dir), - ) - - self.results_file = self.output_dir / "check_results.json" - logger.info(f"Checker 初始化完成,输出目录: {self.output_dir},检查工具: {self.check_tools}") - - def _load_gitignore_patterns(self) -> Optional[Any]: - """ - 加载 .gitignore 文件中的模式,返回一个可用于匹配的函数或对象。 - 若文件不存在或解析失败,返回 None。 - """ - gitignore_path = self.output_dir / ".gitignore" - if not gitignore_path.exists(): - return None - - try: - with open(gitignore_path, "r", encoding="utf-8") as f: - lines = f.readlines() - except Exception as e: - logger.warning(f"读取 .gitignore 失败: {e}") - return None - - if HAS_PATHSPEC: - # 使用 pathspec 精确解析 - try: - spec = PathSpec.from_lines(GitWildMatchPattern, lines) - return spec - except Exception as e: - logger.warning(f"解析 .gitignore 失败,将使用简单匹配: {e}") - return None - else: - # 回退到简单通配符匹配(忽略空行、注释,不支持 ** 和否定模式) - patterns = [] - for line in lines: - line = line.strip() - if not line or line.startswith("#"): - continue - # 移除末尾的注释(# 后面的内容) - if "#" in line: - line = line.split("#", 1)[0].strip() - if line: - patterns.append(line) - return patterns - - def _is_ignored_by_gitignore(self, rel_path: str, gitignore_matcher) -> bool: - """ - 判断相对路径是否被 .gitignore 忽略。 - gitignore_matcher 可以是 PathSpec 对象或简单模式列表。 - """ - if gitignore_matcher is None: - return False - - # 将路径转换为 POSIX 风格(使用 / 分隔符) - rel_path = rel_path.replace(os.sep, "/") - - if HAS_PATHSPEC and isinstance(gitignore_matcher, PathSpec): - return gitignore_matcher.match_file(rel_path) - elif isinstance(gitignore_matcher, list): - # 简单匹配:对于每个模式,如果模式以 / 结尾,则匹配目录;否则匹配文件 - for pattern in gitignore_matcher: - # 处理目录模式(以 / 结尾) - if pattern.endswith("/"): - # 检查路径是否以该目录开头 - if rel_path.startswith(pattern.rstrip("/") + "/") or rel_path == pattern.rstrip("/"): - return True - else: - # 文件/通配符模式,使用 fnmatch - if fnmatch.fnmatch(rel_path, pattern): - return True - return False - - def _filter_files_by_gitignore(self, files: List[Path]) -> List[Path]: - """ - 根据 .gitignore 和硬编码规则(如 .git/)过滤文件列表。 - """ - gitignore_matcher = self._load_gitignore_patterns() - filtered = [] - for file_path in files: - try: - # 计算相对于输出目录的路径 - rel_path = file_path.relative_to(self.output_dir).as_posix() - except ValueError: - # 如果文件不在输出目录下(例如绝对路径),则保留(不过滤) - logger.warning(f"文件 {file_path} 不在输出目录 {self.output_dir} 下,将保留") - filtered.append(file_path) - continue - - # 硬编码忽略 .git 目录 - if rel_path.startswith(".git/") or rel_path == ".git": - logger.debug(f"忽略 .git 目录下的文件: {rel_path}") - continue - - # 检查 .gitignore - if self._is_ignored_by_gitignore(rel_path, gitignore_matcher): - logger.debug(f"忽略 .gitignore 中的文件: {rel_path}") - continue - - filtered.append(file_path) - - return filtered - - def run_check(self, tool: str, file_path: Path) -> Dict[str, Any]: - """ - 运行单个检查工具并返回结果 - - Args: - tool: 检查工具名称(如 'black') - file_path: 要检查的文件路径 - - Returns: - Dict 包含工具名、返回码、stdout、stderr 和错误信息 - """ - logger.debug(f"运行检查工具: {tool} 在文件: {file_path}") - - # 构建命令,根据工具不同调整 - if tool == "black": - cmd = f"black --check --diff {file_path}" - elif tool == "pylint": - cmd = f"pylint {file_path} --output-format=json" - elif tool == "mypy": - cmd = f"mypy {file_path} --show-error-codes --no-error-summary" - else: - # 默认直接运行工具 - cmd = f"{tool} {file_path}" - - try: - result = subprocess.run( - cmd, - shell=True, - cwd=self.output_dir, - capture_output=True, - text=True, - timeout=60, # 1 分钟超时 - ) - - # 解析错误信息 - errors = [] - if result.stderr: - errors.append(result.stderr.strip()) - if result.stdout: - # 对于 pylint 的 JSON 输出,可以进一步解析 - if tool == "pylint" and result.returncode != 0: - try: - pylint_errors = json.loads(result.stdout) - errors.extend([e.get("message", "") for e in pylint_errors]) - except json.JSONDecodeError: - errors.append(result.stdout.strip()) - elif result.returncode != 0: - errors.append(result.stdout.strip()) - - return { - "tool": tool, - "file": str(file_path), - "returncode": result.returncode, - "stdout": result.stdout, - "stderr": result.stderr, - "errors": errors, - } - except subprocess.TimeoutExpired: - logger.error(f"检查工具 {tool} 超时: {cmd}") - return { - "tool": tool, - "file": str(file_path), - "returncode": -1, - "stdout": "", - "stderr": "检查超时", - "errors": ["检查超时"], - } - except Exception as e: - logger.error(f"运行检查工具 {tool} 失败: {e}") - return { - "tool": tool, - "file": str(file_path), - "returncode": -1, - "stdout": "", - "stderr": str(e), - "errors": [str(e)], - } - - def run_parallel_checks(self, files: Optional[List[Path]] = None) -> List[Dict[str, Any]]: - """ - 并行运行检查工具在指定文件上(仅使用配置的第一个工具) - - Args: - files: 要检查的文件路径列表,如果为 None 则自动查找输出目录下所有 .py 文件(排除 .gitignore 中的) - - Returns: - 检查结果列表 - """ - if files is None: - # 递归查找所有 .py 文件 - files = list(self.output_dir.rglob("*.py")) - # 过滤 .gitignore 中的文件 - files = self._filter_files_by_gitignore(files) - - # 只使用第一个工具(已在 __init__ 中保证 self.check_tools 只有一个元素) - tool = self.check_tools[0] - logger.info(f"开始并行检查,文件数: {len(files)},工具: {tool}") - - all_results = [] - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TimeElapsedColumn(), - TimeRemainingColumn(), - ) as progress: - task = progress.add_task("[cyan]Running parallel checks...", total=len(files)) - with ThreadPoolExecutor(max_workers=min(4, len(files))) as executor: - futures = [executor.submit(self.run_check, tool, file_path) for file_path in files] - - error_count = 0 # 初始化错误计数 - for future in as_completed(futures): - try: - result = future.result() - all_results.append(result) - # 检查并更新错误计数 - if result.get("errors") and result["errors"]: - error_count += len(result["errors"]) - except Exception as e: - logger.error(f"并行检查任务失败: {e}") - finally: - # 更新进度条:前进并更新描述以显示错误统计 - progress.update(task, advance=1, description=f"[cyan]Running parallel checks... Errors: {error_count}") - - # 保存结果到文件 - self.save_results(all_results) - logger.info(f"并行检查完成,总结果数: {len(all_results)},总错误数: {error_count}") - return all_results - - def save_results(self, results: List[Dict[str, Any]]) -> None: - """保存检查结果到 JSON 文件""" - try: - with open(self.results_file, "w", encoding="utf-8") as f: - json.dump(results, f, indent=2, ensure_ascii=False) - logger.debug(f"检查结果已保存至: {self.results_file}") - except Exception as e: - logger.error(f"保存检查结果失败: {e}") - - def collect_errors(self, results: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]: - """ - 从检查结果中收集所有错误 - - Args: - results: 检查结果列表,如果为 None 则从文件加载 - - Returns: - 错误列表,每个错误包含文件、工具和错误信息 - """ - if results is None: - if self.results_file.exists(): - try: - with open(self.results_file, "r", encoding="utf-8") as f: - results = json.load(f) - except Exception as e: - logger.error(f"加载检查结果失败: {e}") - return [] - else: - logger.warning("无检查结果文件,先运行检查") - return [] - - errors = [] - for result in results: - if result.get("errors") and result["errors"]: - for error_msg in result["errors"]: - if error_msg: # 跳过空错误 - errors.append({ - "file": result["file"], - "tool": result["tool"], - "error": error_msg, - }) - logger.info(f"收集到 {len(errors)} 个错误") - return errors - - def auto_fix(self, errors: List[Dict[str, Any]], context_files: Optional[List[str]] = None) -> bool: - """ - 自动调用 LLM 生成修复补丁并应用 - - Args: - errors: 错误列表,来自 collect_errors - context_files: 上下文文件路径列表,用于 LLM 生成修复 - - Returns: - bool: 修复是否成功(至少修复了一个错误) - """ - if not errors: - logger.info("没有错误需要修复") - return True - - logger.info(f"开始自动修复 {len(errors)} 个错误") - - # 准备上下文:包括 README、design.json 和相关代码文件 - context_content = [] - - # 添加 README(如果存在) - readme_path = self.output_dir / "README.md" - if readme_path.exists(): - with open(readme_path, "r", encoding="utf-8") as f: - context_content.append(f"### 项目 README ###\n{f.read()}\n") - - # 添加 design.json(如果存在) - design_path = self.output_dir / "design.json" - if design_path.exists(): - with open(design_path, "r", encoding="utf-8") as f: - context_content.append(f"### 设计文件: design.json ###\n{f.read()}\n") - - # 添加错误相关的代码文件 - if context_files is None: - context_files = list(set(error["file"] for error in errors)) - for file_path in context_files: - path = Path(file_path) - if not path.exists(): - path = self.output_dir / file_path - if path.exists(): - with open(path, "r", encoding="utf-8") as f: - context_content.append(f"### 文件: {path.name} (路径: {file_path}) ###\n{f.read()}\n") - - # 添加错误信息 - errors_str = json.dumps(errors, indent=2, ensure_ascii=False) - context_content.append(f"### 检查错误列表 ###\n{errors_str}\n") - - full_context = "\n".join(context_content) - - # 调用 LLM 生成修复 - system_prompt = ( - "你是一个专业的编程助手,擅长修复代码错误。根据提供的上下文(包括项目 README、设计文件、相关代码和检查错误)," - "生成修复补丁代码。返回严格的 JSON 对象,包含两个字段:\n" - "- patches: 数组,每个元素是一个对象,包含 'file'(文件路径)和 'code'(修复后的完整代码或差异)\n" - "- description: 简短的中文修复描述\n" - "注意:只修复提到的错误,保持代码风格一致。" - ) - user_prompt = f"请修复以下检查错误:\n\n{full_context}" - - try: - result = self.code_generator._call_llm(system_prompt, user_prompt, temperature=0.1) - patches = result.get("patches", []) - description = result.get("description", "无描述") - logger.info(f"LLM 生成修复补丁: {description}, 补丁数: {len(patches)}") - - # 应用补丁,使用进度条 - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TimeElapsedColumn(), - TimeRemainingColumn(), - ) as progress: - task = progress.add_task("[cyan]Applying fixes...", total=len(patches)) - success_count = 0 - for patch in patches: - file_path = patch.get("file") - code = patch.get("code") - if not file_path or not code: - logger.warning(f"无效补丁: {patch}") - progress.update(task, advance=1) - continue - - full_path = self.output_dir / file_path - try: - with open(full_path, "w", encoding="utf-8") as f: - f.write(code) - logger.info(f"已应用修复到文件: {file_path}") - success_count += 1 - except Exception as e: - logger.error(f"应用修复失败到文件 {file_path}: {e}") - finally: - progress.update(task, advance=1) - - logger.info(f"自动修复完成,成功修复 {success_count}/{len(patches)} 个补丁") - return success_count > 0 - except Exception as e: - logger.error(f"调用 LLM 生成修复失败: {e}") - return False - - def run_full_check_and_fix(self, max_retries: int = 3) -> bool: - """ - 运行完整检查与修复循环,直到无错误或达到最大重试次数 - - Args: - max_retries: 最大修复重试次数 - - Returns: - bool: 是否成功(无错误或修复后无错误) - """ - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TimeElapsedColumn(), - TimeRemainingColumn(), - ) as progress: - task = progress.add_task("[cyan]Full check and fix cycle", total=max_retries) - for attempt in range(max_retries): - progress.update(task, description=f"[cyan]Attempt {attempt + 1}/{max_retries}") - logger.info(f"检查与修复循环,尝试 {attempt + 1}/{max_retries}") - - # 运行并行检查 - results = self.run_parallel_checks() - errors = self.collect_errors(results) - - if not errors: - progress.update(task, completed=max_retries) - logger.success("所有检查通过,无错误") - return True - - logger.warning(f"发现 {len(errors)} 个错误,尝试自动修复") - success = self.auto_fix(errors) - if not success: - logger.error(f"第 {attempt + 1} 次修复失败") - progress.update(task, advance=1) - if attempt == max_retries - 1: - return False - else: - logger.info(f"第 {attempt + 1} 次修复成功,重新检查") - progress.update(task, advance=1) - - # 最后一次检查 - progress.update(task, description="[cyan]Final check...") - results = self.run_parallel_checks() - errors = self.collect_errors(results) - if errors: - logger.error(f"修复后仍有 {len(errors)} 个错误") - return False - else: - logger.success("修复后所有检查通过") - return True diff --git a/src/llm_codegen/cli.py b/src/llm_codegen/cli.py index 380478b..9172bde 100644 --- a/src/llm_codegen/cli.py +++ b/src/llm_codegen/cli.py @@ -17,7 +17,7 @@ from .core import BaseGenerator from .init_generator import InitGenerator from .enhance_generator import EnhanceGenerator from .fix_generator import FixGenerator -from .checker import Checker +from .design_generator import DesignGenerator # 新增导入DesignGenerator app = typer.Typer(help="基于LLM的自动化代码生成与维护工具") console = Console() @@ -129,14 +129,6 @@ def enhance( f"design.json 不存在于 {output_dir},请先运行 init 命令初始化项目。" ) raise typer.Exit(code=1) - - # 读取工单文件 - try: - with open(issue_file, "r", encoding="utf-8") as f: - issue_content = f.read() - except Exception as e: - logger.error(f"读取工单文件失败: {e}") - raise typer.Exit(code=1) with Progress( SpinnerColumn(), @@ -201,13 +193,6 @@ def fix( logger.error(f"design.json 不存在于 {output_dir},请确保项目已初始化。") raise typer.Exit(code=1) - # 读取工单文件 - try: - with open(issue_file, "r", encoding="utf-8") as f: - issue_content = f.read() - except Exception as e: - logger.error(f"读取工单文件失败: {e}") - raise typer.Exit(code=1) try: with Progress( SpinnerColumn(), @@ -240,19 +225,28 @@ def fix( @app.command() def design( - file: Path = typer.Option( - ..., + file: Optional[Path] = typer.Option( + None, "--file", "-f", - help="README文件路径,用于生成design.json", + help="README文件路径,用于生成design.json;如果与--source同时使用,则生成design.json后从源代码刷新", exists=True, file_okay=True, dir_okay=False, ), + source: Optional[Path] = typer.Option( + None, + "--source", + help="源代码目录路径,用于从源代码刷新design.json;必须为目录", + exists=True, + file_okay=False, + dir_okay=True, + ), output_dir: Optional[Path] = typer.Option( None, "--output", "-o", help="输出目录,design.json将保存在此,默认为当前目录" ), - force: bool = typer.Option(False, "--force", help="强制覆盖已存在的design.json"), + force: bool = typer.Option(False, "--force", help="强制覆盖已存在的design.json,或强制从源代码刷新"), + dry_run: bool = typer.Option(False, "--dry-run", help="模拟运行,不实际写入文件或执行命令,仅打印信息"), api_key: Optional[str] = typer.Option( None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥" ), @@ -265,21 +259,19 @@ def design( 4, "--max-concurrency", help="并发生成的最大工作线程数,默认4" ), ): - """生成或更新design.json:根据README文件生成中间设计文件,不生成完整代码。""" + """生成或更新design.json:支持从README生成、从源代码刷新,并集成新的设计生成逻辑。""" if output_dir is None: output_dir = Path.cwd() # 初始化日志配置 log_file_path = init_logging(output_dir, log_file, command_name="design") - # 检查design.json是否存在并处理强制覆盖 - design_path = output_dir / "design.json" - if not force and design_path.exists(): - logger.error( - f"design.json 已存在于 {design_path}。使用 --force 参数以强制覆盖。" - ) + # 检查是否提供了至少一个操作参数 + if file is None and source is None: + logger.error("必须提供 --file 或 --source 参数之一来执行操作。") raise typer.Exit(code=1) + # 初始化DesignGenerator以集成新的设计生成逻辑 try: with Progress( SpinnerColumn(), @@ -287,8 +279,8 @@ def design( BarColumn(), console=console, ) as progress: - task_id = progress.add_task("正在生成design.json...", total=1) # 可选:保持现有风格,但工单未要求修改此命令 - generator = BaseGenerator( + task_id = progress.add_task("正在处理design命令...", total=1) + generator = DesignGenerator( api_key=api_key, base_url=base_url, model=model, @@ -296,61 +288,51 @@ def design( log_file=log_file_path, max_concurrency=max_concurrency, ) - # 解析README文件并设置内容 - generator.readme_content = generator.parse_readme(file) - # 生成design.json - generator.generate_design_json() - progress.update(task_id, completed=1, description="design.json 生成完成") # 可选:更新完成状态 - console.print(f"[green]✅ design.json 已生成在 {design_path}[/green]") + + design_path = output_dir / "design.json" + + # 处理--dry-run选项 + if dry_run: + console.print("[yellow]模拟运行模式:不会实际写入文件或执行命令。[/yellow]") + if file is not None: + logger.info(f"模拟:将从README文件 {file} 生成design.json") + # 在dry-run模式下,仅模拟解析README + content = generator.parse_readme(file) + console.print(f"[blue]模拟解析README内容完成,长度: {len(content)} 字符[/blue]") + if source is not None: + logger.info(f"模拟:将从源代码目录 {source} 刷新design.json") + # 模拟分析源代码 + design_info = generator.analyze_source_files(source) + console.print(f"[blue]模拟分析完成,共分析 {len(design_info['files'])} 个文件[/blue]") + progress.update(task_id, completed=1, description="模拟运行完成") + console.print("[green]✅ 模拟运行完成,无实际文件操作。[/green]") + return + + # 实际运行逻辑 + if file is not None: + # 生成design.json从README + if not force and design_path.exists(): + logger.error( + f"design.json 已存在于 {design_path}。使用 --force 参数以强制覆盖。" + ) + raise typer.Exit(code=1) + generator.run(readme_path=file) + logger.info(f"已从README生成design.json: {design_path}") + + if source is not None: + # 从源代码刷新design.json + success = generator.refresh_design_from_source(source) + if not success: + logger.error("从源代码刷新design.json失败") + raise typer.Exit(code=1) + logger.info(f"已从源代码刷新design.json: {design_path}") + + progress.update(task_id, completed=1, description="design命令处理完成") + console.print(f"[green]✅ design.json 已处理完成,路径: {design_path}[/green]") except Exception as e: - logger.error(f"生成design.json失败: {e}") - raise typer.Exit(code=1) - - -@app.command() -def check( - output_dir: Optional[Path] = typer.Option( - None, "--output", "-o", help="项目根目录,默认为当前目录" - ), - api_key: Optional[str] = typer.Option( - None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥" - ), - base_url: str = typer.Option( - "https://api.deepseek.com", "--base-url", help="API基础URL" - ), - model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), - log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), - max_retries: int = typer.Option(3, "--max-retries", help="最大修复重试次数"), - max_concurrency: int = typer.Option( - 4, "--max-concurrency", help="并发生成的最大工作线程数,默认4" - ), -): - """运行代码检查和自动修复(不依赖于工单)""" - if output_dir is None: - output_dir = Path.cwd() - - # 初始化日志配置 - log_file_path = init_logging(output_dir, log_file, command_name="check") - - try: - generator = BaseGenerator( - api_key=api_key, - base_url=base_url, - model=model, - output_dir=str(output_dir), - log_file=log_file_path, - max_concurrency=max_concurrency, - ) - checker = Checker(output_dir=output_dir, code_generator=generator) - success = checker.run_full_check_and_fix(max_retries=max_retries) - if not success: - logger.error("检查修复失败") - raise typer.Exit(code=1) - console.print("[green]检查与修复完成。详情请查看日志。[/green]") - except Exception as e: - logger.error(f"检查失败: {e}") + logger.error(f"处理design命令失败: {e}") raise typer.Exit(code=1) if __name__ == "__main__": - app() + app() \ No newline at end of file diff --git a/src/llm_codegen/core.py b/src/llm_codegen/core.py index e3d029c..95d569d 100644 --- a/src/llm_codegen/core.py +++ b/src/llm_codegen/core.py @@ -15,11 +15,15 @@ from loguru import logger from openai import OpenAI from .utils import is_dangerous_command -from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 FileStatus 导入 -from .diff_applier import parse_diff, apply_diff +from .models import ( + DesignModel, + StateModel, + FileModel, + FileStatus, +) # 添加 FileStatus 导入 -class BaseGenerator: +class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" def __init__( @@ -29,7 +33,7 @@ class BaseGenerator: model: str = "deepseek-reasoner", output_dir: str = "./generated", log_file: Optional[str] = None, - max_concurrency: int = 4 + max_concurrency: int = 4, ): """ 初始化生成器 @@ -118,12 +122,12 @@ class BaseGenerator: "system_prompt": system_prompt, "user_prompt": user_prompt, "temperature": temperature, - "expect_json": expect_json + "expect_json": expect_json, } - + with open(response_file, "w", encoding="utf-8") as f: json.dump(response_data, f, indent=2, ensure_ascii=False) - + logger.debug(f"LLM原始响应: {response_file.name}") if expect_json: @@ -142,7 +146,6 @@ class BaseGenerator: self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") raise - def parse_readme(self, readme_path: Path) -> str: """ 读取README文件内容 @@ -168,17 +171,17 @@ class BaseGenerator: "返回严格的 JSON 对象,符合DesignModel结构。" ) user_prompt = f"README内容如下:\n\n{self.readme_content}" - + result = self._call_llm(system_prompt, user_prompt) design_data = result design = DesignModel(**design_data) - + # 写入design.json文件 design_path = self.output_dir / "design.json" with open(design_path, "w", encoding="utf-8") as f: json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) logger.info(f"已生成design.json: {design_path}") - + return design def load_state(self) -> Optional[StateModel]: @@ -188,7 +191,9 @@ class BaseGenerator: with open(self.state_file, "r", encoding="utf-8") as f: state_data = json.load(f) self.state = StateModel(**state_data) - logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个") + logger.info( + f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" + ) return self.state except Exception as e: logger.error(f"加载状态失败: {e}") @@ -196,7 +201,9 @@ class BaseGenerator: return None return None - def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: + def save_state( + self, generated_files: List[str], dependencies_map: Dict[str, List[str]] + ) -> None: """保存断点续写状态,适应并发生成(线程安全)""" with self._state_lock: # 串行化写入 state = StateModel( @@ -205,13 +212,12 @@ class BaseGenerator: dependencies_map=dependencies_map, total_files=len(self.design.files) if self.design else 0, output_dir=str(self.output_dir), - readme_path=self.readme_content[:100] if self.readme_content else "" + readme_path=self.readme_content[:100] if self.readme_content else "", ) with open(self.state_file, "w", encoding="utf-8") as f: json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) logger.debug(f"状态已保存: {self.state_file}") - def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: """ 从design.json获取文件列表和依赖关系 @@ -223,24 +229,26 @@ class BaseGenerator: """ if not self.design: raise ValueError("design.json未加载,请先调用generate_design_json") - + files = [file.path for file in self.design.files] dependencies = {file.path: file.dependencies for file in self.design.files} - + logger.info(f"从design.json解析到 {len(files)} 个待生成文件") logger.debug(f"文件列表: {files}") logger.debug(f"依赖关系: {dependencies}") - + return files, dependencies - def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]: + def _add_implicit_dependencies( + self, files: List[str], dependencies: Dict[str, List[str]] + ) -> Dict[str, List[str]]: """ 添加隐式依赖关系,基于文件路径和常见模式 - + Args: files: 文件路径列表 dependencies: 原始依赖字典 - + Returns: Dict[str, List[str]]: 增强后的依赖字典 """ @@ -251,75 +259,17 @@ class BaseGenerator: # 添加同一目录下的其他文件作为隐式依赖(简单示例) path = Path(file) implicit_deps = [ - f for f in files - if f != file and Path(f).parent == path.parent and f not in enhanced[file] + f + for f in files + if f != file + and Path(f).parent == path.parent + and f not in enhanced[file] ] if implicit_deps: enhanced[file].extend(implicit_deps) logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") return enhanced - def _apply_diff(self, diff: str, original_content: str) -> str: - """ - 应用 unified diff 到原始内容,返回修改后的内容。 - - Args: - diff: 字符串形式的 unified diff - original_content: 原始文件内容 - - Returns: - str: 应用 diff 后的内容 - - Raises: - Exception: 如果应用 diff 失败 - """ - try: - # 解析 diff 行 - diff_lines = diff.splitlines(keepends=True) - if not diff_lines: - raise ValueError("diff 为空") - - # 简单的 diff 应用逻辑:假设 diff 是标准 unified diff,逐行处理 - # 注意:这是一个简化实现,对于复杂 diff 可能不准确,建议使用专用库如 `patch` - original_lines = original_content.splitlines(keepends=True) - result_lines = [] - i = 0 - j = 0 - while i < len(diff_lines): - line = diff_lines[i] - if line.startswith('--- ') or line.startswith('+++ '): - i += 1 - continue - elif line.startswith('@@ '): - i += 1 - continue - elif line.startswith(' '): - # 未修改行 - if j < len(original_lines): - result_lines.append(original_lines[j]) - j += 1 - i += 1 - elif line.startswith('-'): - # 删除行 - j += 1 - i += 1 - elif line.startswith('+'): - # 新增行 - result_lines.append(line[1:]) - i += 1 - else: - i += 1 # 跳过未知行 - - # 添加剩余原始行 - while j < len(original_lines): - result_lines.append(original_lines[j]) - j += 1 - - return ''.join(result_lines) - except Exception as e: - logger.error(f"应用 diff 时出错: {e}") - raise RuntimeError(f"无法应用 diff: {e}") - def generate_file( self, file_path: str, @@ -330,32 +280,34 @@ class BaseGenerator: ) -> Tuple[str, str, List[str]]: """ 生成单个文件,返回 (代码, 描述, 命令列表) - + Args: file_path: 目标文件路径 prompt_instruction: 生成指令 dependency_files: 依赖文件列表(用于上下文) existing_content: 文件现有内容(若为修改模式) - output_format: 输出格式,'full' 或 'diff',来自 models.py + output_format: 输出格式,'full',来自 models.py """ # 收集上下文内容 context_content = [] if self.readme_content: context_content.append(f"### 项目 README ###\n{self.readme_content}\n") - + # 添加 design.json 上下文 design_path = self.output_dir / "design.json" if design_path.exists(): try: with open(design_path, "r", encoding="utf-8") as f: design_content = f.read() - context_content.append(f"### 设计文件: design.json ###\n{design_content}\n") + context_content.append( + f"### 设计文件: design.json ###\n{design_content}\n" + ) except Exception as e: logger.error(f"读取design.json失败: {e}") self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") # 如果design.json读取失败,可能无法继续,但保持上下文为空或部分 - + # 添加依赖文件内容(仅读取存在的文件) for dep in dependency_files: dep_path = Path(dep) @@ -365,116 +317,94 @@ class BaseGenerator: dep_path = alt_path else: logger.warning(f"依赖文件不存在,已跳过: {dep}") - self.console.print(f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]") + self.console.print( + f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]" + ) continue try: with open(dep_path, "r", encoding="utf-8") as f: content = f.read() - context_content.append(f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n") + context_content.append( + f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" + ) except Exception as e: logger.error(f"读取依赖文件 {dep} 失败: {e}") - self.console.print(f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]") + self.console.print( + f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]" + ) # 跳过此依赖文件 # 如果有现有内容,也加入上下文 if existing_content is not None: - context_content.append(f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n") + context_content.append( + f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n" + ) full_context = "\n".join(context_content) - # 根据 output_format 设置 system_prompt - if output_format == "diff": - if existing_content is None: - logger.error("对于 output_format='diff',必须提供 existing_content") - self.console.print("[bold red]❌ 对于 output_format='diff',必须提供 existing_content[/bold red]") - return "# 错误:缺少现有内容", "生成失败,缺少现有内容", [] + # output_format 为 'full' 或其他,保持现有逻辑 + if existing_content is not None: system_prompt = ( - "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成文件的差异(diff)。" + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" "返回严格的 JSON 对象,包含四个字段:\n" - "- diff: (string) 文件的差异,使用 unified diff 格式\n" + "- code: (string) 修改后的完整代码\n" "- description: (string) 简短的中文修改描述\n" - "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表,若无则返回空数组\n" - "- output_format: (string) 应为 'diff'" + "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" ) else: - # output_format 为 'full' 或其他,保持现有逻辑 - if existing_content is not None: - system_prompt = ( - "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" - "返回严格的 JSON 对象,包含四个字段:\n" - "- code: (string) 修改后的完整代码\n" - "- description: (string) 简短的中文修改描述\n" - "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" - "- output_format: (string) 应为 'full'" - ) - else: - system_prompt = ( - "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" - "返回严格的 JSON 对象,包含四个字段:\n" - "- code: (string) 生成的完整代码\n" - "- description: (string) 简短的中文功能描述\n" - "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" - "- output_format: (string) 应为 'full'" - ) + system_prompt = ( + "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" + "返回严格的 JSON 对象,包含四个字段:\n" + "- code: (string) 生成的完整代码\n" + "- description: (string) 简短的中文功能描述\n" + "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" + "- output_format: (string) 应为 'full'" + ) user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" - if output_format == "diff": - user_prompt += f"\noutput_format: {output_format}" try: result = self._call_llm(system_prompt, user_prompt) - # 解析响应,假设包含 output_format 字段 - if output_format == "diff": - diff = result.get("diff") - description = result.get("description", "") - commands = result.get("commands", []) - result.get("output_format", "diff") - if diff is None: - raise ValueError("LLM 响应中没有 diff 字段") - # 调用 diff_applier 应用 diff - try: - chunks = parse_diff(diff) - code, conflicts = apply_diff(existing_content, chunks) - if conflicts: - logger.warning(f"应用diff时发现冲突: {conflicts}") - # 可以记录冲突,但继续处理 - except Exception as e: - logger.error(f"应用 diff 时发生意外错误: {e}") - self.console.print(f"[bold red]❌ 应用 diff 时发生意外错误: {e}[/bold red]") - return "# 应用 diff 失败", f"应用 diff 时发生意外错误: {e}", [] - return code, description, commands - else: - code = result.get("code") - description = result.get("description", "") - commands = result.get("commands", []) - result.get("output_format", "full") - if code is None: - raise ValueError("LLM 响应中没有 code 字段") - return code, description, commands + code = result.get("code") + description = result.get("description", "") + commands = result.get("commands", []) + result.get("output_format", "full") + if code is None: + raise ValueError("LLM 响应中没有 code 字段") + return code, description, commands except Exception as e: logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") - self.console.print(f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]") + self.console.print( + f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]" + ) # 返回默认值以便继续 return "# 生成失败,请检查日志", "生成失败,发生错误", [] - def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]: + def _generate_file_task( + self, file_path: str, dependencies: List[str], generated_files: set + ) -> Tuple[bool, str]: """ 并发任务函数,用于生成单个文件 - + Args: file_path: 文件路径 dependencies: 依赖文件列表 generated_files: 已生成文件的集合(用于上下文) - + Returns: Tuple[bool, str]: (是否成功, 错误信息或空字符串) """ try: - instruction = f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + instruction = ( + f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + ) # 过滤依赖文件,只使用已生成的 available_deps = [dep for dep in dependencies if dep in generated_files] - code, desc, commands = self.generate_file(file_path, instruction, available_deps) + code, desc, commands = self.generate_file( + file_path, instruction, available_deps + ) logger.info(f"生成完成: {file_path} - {desc}") # 写入文件 @@ -495,7 +425,9 @@ class BaseGenerator: logger.error(f"生成文件 {file_path} 失败: {e}") return False, str(e) - def _topological_sort(self, files: List[str], dependencies: Dict[str, List[str]]) -> List[str]: + def _topological_sort( + self, files: List[str], dependencies: Dict[str, List[str]] + ) -> List[str]: """ 对文件列表进行拓扑排序,基于依赖关系。 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 @@ -510,9 +442,9 @@ class BaseGenerator: # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] for f in files: for dep in dependencies.get(f, []): - if dep in files: # 只考虑在files中的依赖 - in_degree[f] += 1 # f依赖于dep,所以f的入度增加 - rev_graph[dep].append(f) # dep被f依赖 + if dep in files: # 只考虑在files中的依赖 + in_degree[f] += 1 # f依赖于dep,所以f的入度增加 + rev_graph[dep].append(f) # dep被f依赖 # 队列初始化为入度为0的文件(无依赖的文件) queue = deque([f for f in files if in_degree[f] == 0]) @@ -529,7 +461,9 @@ class BaseGenerator: # 检查是否所有文件都已排序(无循环依赖) if len(sorted_files) != len(files): - raise ValueError(f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。") + raise ValueError( + f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" + ) return sorted_files @@ -543,7 +477,9 @@ class BaseGenerator: dangerous, reason = is_dangerous_command(cmd) if dangerous: logger.error(f"危险命令被阻止: {cmd},原因: {reason}") - self.console.print(f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]") + self.console.print( + f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" + ) return False logger.info(f"执行命令: {cmd}") @@ -563,7 +499,9 @@ class BaseGenerator: logger.warning(f"stderr: {result.stderr[:500]}") if result.returncode != 0: logger.error(f"命令执行失败,返回码: {result.returncode}") - self.console.print(f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]") + self.console.print( + f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" + ) return False return True except subprocess.TimeoutExpired: @@ -592,6 +530,16 @@ class BaseGenerator: ) # 将现有 design.json 内容作为上下文的一部分 + if not self.design: + design_path = self.output_dir / "design.json" + try: + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) + except Exception as e: + logger.error(f"加载design.json失败: {e}") + self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") + raise e design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) user_prompt = ( f"工单类型: {issue_type}\n" @@ -602,7 +550,9 @@ class BaseGenerator: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) return result - def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): + def _update_design( + self, generated_files: List[str], design_updates: Dict[str, Any] + ): """ 根据生成的变更更新 design.json 使用 FileModel 来处理文件信息 @@ -616,7 +566,7 @@ class BaseGenerator: if not exists: # 获取更新信息 update_info = design_updates.get(file_path, {}) - + # 创建新文件条目(FileModel实例) new_file = FileModel( path=file_path, @@ -624,7 +574,7 @@ class BaseGenerator: dependencies=update_info.get("dependencies", []), functions=update_info.get("functions", []), classes=update_info.get("classes", []), - design_updates=update_info.get("design_updates", {}) + design_updates=update_info.get("design_updates", {}), ) self.design.files.append(new_file) updated = True @@ -654,13 +604,17 @@ class BaseGenerator: self.readme_content = self.parse_readme(readme_path) except Exception as e: logger.error(f"读取README.md失败,无法刷新design: {e}") - self.console.print(f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]") + self.console.print( + f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]" + ) return False else: logger.error("没有README内容,且README.md文件不存在,无法刷新design") - self.console.print("[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]") + self.console.print( + "[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]" + ) return False - + try: self.design = self.generate_design_json() logger.info("design.json已成功重新生成") @@ -682,7 +636,9 @@ class BaseGenerator: design_path = self.output_dir / "design.json" if not design_path.exists(): logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) return False try: with open(design_path, "r", encoding="utf-8") as f: @@ -692,8 +648,8 @@ class BaseGenerator: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") return False - - # 调用LLM分析文件内容,返回更新信息 + + # 调用LLM分析文件内容,返回更新信息,增强以支持design_updates字段 system_prompt = ( "你是一个软件架构师。分析给定的文件内容,并返回对design.json中该文件条目的更新。" "返回严格的JSON对象,包含以下字段:\n" @@ -701,6 +657,7 @@ class BaseGenerator: "- dependencies: 依赖文件列表\n" "- functions: 函数列表,每个对象有name, summary, inputs, outputs\n" "- classes: 类列表,每个对象有name, summary, methods\n" + "- design_updates: 可选,设计更新字典\n" "注意:仅返回JSON,不要其他文本。" ) # 准备当前design.json中该文件的条目信息 @@ -710,11 +667,11 @@ class BaseGenerator: current_entry = f.model_dump() break user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目(如果存在):\n{json.dumps(current_entry, indent=2) if current_entry else '无'}" - + try: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) update_info = result - + # 查找或创建文件条目 file_model = None for f in self.design.files: @@ -722,30 +679,40 @@ class BaseGenerator: file_model = f break if file_model is None: - # 创建新条目 - file_model = FileModel( + # 创建新条目,包括design_updates + new_file = FileModel( path=file_path, summary=update_info.get("summary", ""), dependencies=update_info.get("dependencies", []), functions=update_info.get("functions", []), - classes=update_info.get("classes", []) + classes=update_info.get("classes", []), + design_updates=update_info.get("design_updates", {}), # 新增design_updates处理 ) - self.design.files.append(file_model) + self.design.files.append(new_file) logger.info(f"在design.json中创建了新文件条目: {file_path}") else: - # 更新现有条目 + # 更新现有条目,使用merge_design_updates处理design_updates + if 'design_updates' in update_info: + file_model.merge_design_updates(update_info['design_updates']) + # 更新其他字段 file_model.summary = update_info.get("summary", file_model.summary) - file_model.dependencies = update_info.get("dependencies", file_model.dependencies) - file_model.functions = update_info.get("functions", file_model.functions) + file_model.dependencies = update_info.get( + "dependencies", file_model.dependencies + ) + file_model.functions = update_info.get( + "functions", file_model.functions + ) file_model.classes = update_info.get("classes", file_model.classes) logger.info(f"更新了design.json中的文件条目: {file_path}") - + # 保存更新后的design.json design_path = self.output_dir / "design.json" with open(design_path, "w", encoding="utf-8") as f: json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) logger.info(f"design.json已更新,文件条目: {file_path}") - self.console.print(f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]") + self.console.print( + f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" + ) return True except Exception as e: logger.error(f"更新文件条目失败: {e}") @@ -762,7 +729,9 @@ class BaseGenerator: readme_path = self.output_dir / "README.md" if not readme_path.exists(): logger.error(f"README.md不存在于 {self.output_dir}") - self.console.print(f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]") + self.console.print( + f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]" + ) return False try: with open(readme_path, "r", encoding="utf-8") as f: @@ -771,12 +740,14 @@ class BaseGenerator: logger.error(f"读取README.md失败: {e}") self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") return False - + # 加载design.json design_path = self.output_dir / "design.json" if not design_path.exists(): logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) return False try: with open(design_path, "r", encoding="utf-8") as f: @@ -786,7 +757,7 @@ class BaseGenerator: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") return False - + # 调用LLM比较和同步 system_prompt = ( "你是一个软件架构师。比较README.md内容和design.json,识别不一致之处,并建议更新。" @@ -797,15 +768,17 @@ class BaseGenerator: "注意:仅返回JSON,不要其他文本。" ) user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" - + try: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) needs_update = result.get("needs_update", False) if not needs_update: logger.info("README.md和design.json已同步,无需更新") - self.console.print("[green]✅ README.md和design.json已同步,无需更新[/green]") + self.console.print( + "[green]✅ README.md和design.json已同步,无需更新[/green]" + ) return True - + update_type = result.get("update_type", "") updates = result.get("updates", {}) if update_type == "readme": @@ -836,10 +809,12 @@ class BaseGenerator: self.console.print("[green]✅ README.md和design.json已同步更新[/green]") else: logger.warning(f"未知的update_type: {update_type}") - self.console.print(f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]") + self.console.print( + f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]" + ) return False return True except Exception as e: logger.error(f"同步README.md失败: {e}") self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") - return False \ No newline at end of file + return False diff --git a/src/llm_codegen/design_generator.py b/src/llm_codegen/design_generator.py new file mode 100644 index 0000000..d2bf739 --- /dev/null +++ b/src/llm_codegen/design_generator.py @@ -0,0 +1,244 @@ +import json +from pathlib import Path +from typing import Optional, Dict, Any, List +import sys +from loguru import logger +from rich.console import Console + +from .core import CodeGenerator +from .models import DesignModel, FileModel, FileStatus, LLMResponse + + +class DesignGenerator(CodeGenerator): + """设计生成器,专门处理设计文件的生成和增量更新逻辑。""" + + def __init__( + self, + api_key: Optional[str] = None, + base_url: str = "https://api.deepseek.com", + model: str = "deepseek-reasoner", + output_dir: str = "./generated", + log_file: Optional[str] = None, + max_concurrency: int = 4, + ): + """初始化设计生成器。 + + Args: + api_key: OpenAI API密钥,默认从环境变量DEEPSEEK_APIKEY读取 + base_url: API基础URL + model: 使用的模型 + output_dir: 输出根目录 + log_file: 日志文件路径,默认自动生成 + max_concurrency: 最大并发数 + """ + super().__init__( + api_key=api_key, + base_url=base_url, + model=model, + output_dir=output_dir, + log_file=log_file, + max_concurrency=max_concurrency, + ) + self.console = Console() + logger.info("DesignGenerator 初始化完成") + + def process_design_command(self, issue_content: str, issue_type: str = "design") -> bool: + """处理设计命令,基于工单内容实现设计文件的生成和增量更新逻辑。 + + 包括源文件分析、LLM调用和合并/覆盖处理。 + + Args: + issue_content: 工单内容字符串 + issue_type: 工单类型,默认为 "design" + + Returns: + bool: 是否成功处理 + """ + logger.info(f"开始处理设计命令,工单类型: {issue_type}") + try: + # 1. 源文件分析:分析工单以获取变更计划 + analysis_result = self._analyze_issue(issue_content, issue_type) + affected_files = analysis_result.get("affected_files", []) + design_updates = analysis_result.get("design_updates", {}) + logger.debug(f"分析结果 - 受影响文件: {affected_files}, 设计更新: {design_updates}") + + # 2. LLM调用和文件生成/更新 + generated_files: List[str] = [] + for file_info in affected_files: + file_path = file_info["path"] + action = file_info.get("action", "create") + description = file_info.get("description", "") + dependencies = file_info.get("dependencies", []) + + # 检查文件现有内容以支持增量更新 + existing_content = None + output_path = self.output_dir / file_path + if output_path.exists() and action == "modify": + try: + with open(output_path, "r", encoding="utf-8") as f: + existing_content = f.read() + logger.info(f"文件存在,将进行修改: {file_path}") + except Exception as e: + logger.error(f"读取现有文件失败,视为创建: {e}") + existing_content = None + elif action == "create" and output_path.exists(): + logger.warning(f"文件已存在,但工单要求创建,将覆盖: {file_path}") + # 可以选择保留或覆盖,这里默认覆盖以处理增量 + + # 构建生成指令 + if action == "create": + instruction = f"根据工单分析,创建文件 '{file_path}',描述: {description}" + else: # modify + instruction = f"根据工单分析,修改文件 '{file_path}',描述: {description}" + + # 调用LLM生成代码 + code, desc, commands = self.generate_file( + file_path=file_path, + prompt_instruction=instruction, + dependency_files=dependencies, + existing_content=existing_content, + output_format="full", + ) + logger.info(f"生成文件完成: {file_path} - {desc}") + + # 写入文件,处理合并/覆盖 + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + generated_files.append(file_path) + logger.info(f"已写入文件: {output_path}") + + # 执行相关命令 + for cmd in commands: + self.execute_command(cmd, cwd=self.output_dir) + + # 3. 更新design.json,处理合并/覆盖 + if generated_files or design_updates: + self._update_design(generated_files, design_updates) + logger.info("design.json 已更新") + self.console.print("[green]✅ 设计文件更新完成[/green]") + + logger.info("设计命令处理成功") + return True + except Exception as e: + logger.error(f"处理设计命令失败: {e}") + self.console.print(f"[bold red]❌ 处理设计命令失败: {e}[/bold red]") + return False + + def analyze_source_files(self, source_dir: Path) -> Dict[str, Any]: + """分析源代码目录以提取设计信息,用于生成或刷新design.json。 + + Args: + source_dir: 源代码目录路径 + + Returns: + Dict[str, Any]: 包含提取的设计信息,如文件列表、依赖等 + """ + logger.info(f"开始分析源代码目录: {source_dir}") + if not source_dir.exists(): + logger.error(f"源代码目录不存在: {source_dir}") + raise FileNotFoundError(f"目录不存在: {source_dir}") + + # 收集所有Python文件 + python_files = list(source_dir.rglob("*.py")) + design_info = {"files": [], "dependencies": {}} + + # 简单分析:遍历文件并调用LLM提取信息(可优化为并发) + for file_path in python_files: + rel_path = str(file_path.relative_to(source_dir)) + try: + with open(file_path, "r", encoding="utf-8") as f: + content = f.read() + + # 调用LLM分析单个文件 + system_prompt = ( + "你是一个软件架构师。分析给定的Python文件内容,返回设计信息,包括摘要、依赖、函数和类。" + "返回严格的JSON对象,包含summary、dependencies、functions、classes字段。" + ) + user_prompt = f"文件路径: {rel_path}\n文件内容:\n{content}" + result = self._call_llm(system_prompt, user_prompt, temperature=0.2) + + # 构建文件模型 + file_model = { + "path": rel_path, + "summary": result.get("summary", "自动分析生成"), + "dependencies": result.get("dependencies", []), + "functions": result.get("functions", []), + "classes": result.get("classes", []), + "design_updates": {}, + } + design_info["files"].append(file_model) + design_info["dependencies"][rel_path] = file_model["dependencies"] + logger.debug(f"已分析文件: {rel_path}") + except Exception as e: + logger.error(f"分析文件 {rel_path} 失败: {e}") + # 跳过失败的文件 + + logger.info(f"源代码分析完成,共分析 {len(design_info['files'])} 个文件") + return design_info + + def refresh_design_from_source(self, source_dir: Path) -> bool: + """从源代码目录刷新design.json,基于分析结果。 + + Args: + source_dir: 源代码目录路径 + + Returns: + bool: 是否成功刷新 + """ + logger.info("开始从源代码刷新design.json") + try: + # 分析源代码 + design_info = self.analyze_source_files(source_dir) + + # 构建DesignModel + design = DesignModel( + project_name=self.design.project_name if self.design else "llm-codegen", + version=self.design.version if self.design else "1.0.0", + description=self.design.description if self.design else "基于大语言模型的代码生成工具", + files=[FileModel(**file) for file in design_info["files"]], + commands=self.design.commands if self.design else [], + check_tools=self.design.check_tools if self.design else [], + ) + + # 保存design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) + + self.design = design + logger.info(f"design.json 已刷新并保存至: {design_path}") + self.console.print("[green]✅ design.json 已从源代码刷新[/green]") + return True + except Exception as e: + logger.error(f"从源代码刷新design.json失败: {e}") + self.console.print(f"[bold red]❌ 从源代码刷新design.json失败: {e}[/bold red]") + return False + + def run(self, readme_path: Optional[Path] = None, issue_content: Optional[str] = None) -> None: + """主执行流程,用于集成到命令行接口。 + + Args: + readme_path: README文件路径,可选 + issue_content: 工单内容字符串,可选 + """ + if readme_path: + self.readme_content = self.parse_readme(readme_path) + self.design = self.generate_design_json() + logger.info("已从README生成design.json") + + if issue_content: + success = self.process_design_command(issue_content) + if success: + logger.info("设计命令处理完成") + else: + logger.error("设计命令处理失败") + sys.exit(1) + else: + logger.warning("未提供工单内容,仅生成或刷新设计文件") + + +if __name__ == "__main__": + # 示例用法 + generator = DesignGenerator() + generator.run() diff --git a/src/llm_codegen/enhance_generator.py b/src/llm_codegen/enhance_generator.py index 7d6a11d..971640d 100644 --- a/src/llm_codegen/enhance_generator.py +++ b/src/llm_codegen/enhance_generator.py @@ -4,10 +4,10 @@ from typing import Any, Dict, List, Optional from loguru import logger from rich.console import Console -from .core import BaseGenerator +from .core import CodeGenerator -class EnhanceGenerator(BaseGenerator): +class EnhanceGenerator(CodeGenerator): """ 增强生成器类,继承自 BaseGenerator,专门处理 enhance 命令逻辑。 用于根据需求工单(feature.issue)对现有项目进行功能增强。 @@ -73,6 +73,7 @@ class EnhanceGenerator(BaseGenerator): except Exception as e: logger.error(f"分析工单失败: {e}") self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") + raise e return False affected_files = analysis_result.get("affected_files", []) @@ -93,7 +94,7 @@ class EnhanceGenerator(BaseGenerator): sorted_paths = self._topological_sort(file_paths, dependencies) logger.debug(f"拓扑排序结果: {sorted_paths}") except ValueError as e: - logger.error(f"拓扑排序失败,检测到循环依赖: {e}") + logger.error(f"拓扑排序失败,检测到循环依赖: {e}, dependencies: {dependencies}") self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]") return False @@ -145,6 +146,18 @@ class EnhanceGenerator(BaseGenerator): existing_content=existing_content, output_format=output_format ) + # 将代码写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入文件: {output_path}") + self.console.print(f"[green]✅ 已写入文件: {file_path}[/green]") + except Exception as e: + logger.error(f"写入文件 {file_path} 失败: {e}") + self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]") + continue # generate_file 内部已写入文件并执行命令 generated_files.append(file_path) logger.info(f"文件处理完成: {file_path} - {desc}") diff --git a/src/llm_codegen/fix_generator.py b/src/llm_codegen/fix_generator.py index 8b6f121..a6ede50 100644 --- a/src/llm_codegen/fix_generator.py +++ b/src/llm_codegen/fix_generator.py @@ -1,11 +1,11 @@ import json from pathlib import Path from typing import List, Optional -from .core import BaseGenerator +from .core import CodeGenerator from .models import OutputFormat -class FixGenerator(BaseGenerator): +class FixGenerator(CodeGenerator): """处理 Bug 修复逻辑的生成器类,继承自 BaseGenerator。""" def __init__(self, **kwargs): diff --git a/src/llm_codegen/models.py b/src/llm_codegen/models.py index 932d42a..54d287d 100644 --- a/src/llm_codegen/models.py +++ b/src/llm_codegen/models.py @@ -42,6 +42,14 @@ class FileModel(BaseModel): classes: List[ClassModel] = Field(default_factory=list) design_updates: Dict[str, Any] = Field(default_factory=dict) + def merge_design_updates(self, updates: Dict[str, Any]) -> None: + """合并设计更新到当前文件模型。 + + 参数: + updates: 一个字典,包含要合并的设计更新。 + """ + self.design_updates.update(updates) + class DesignModel(BaseModel): """设计模型,对应 design.json 的根结构。""" @@ -90,4 +98,4 @@ class LLMResponse(BaseModel): code: str description: str commands: List[str] = Field(default_factory=list) - output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full' 或 'diff',默认为 'full'") \ No newline at end of file + output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full' 或 'diff',默认为 'full'")