feat(README): 移除并行检查与自修复功能描述,优化项目结构说明

This commit is contained in:
songsenand 2026-03-20 08:41:12 +08:00
parent 42e63f2d93
commit 2f280e3f8d
12 changed files with 622 additions and 846 deletions

View File

@ -11,8 +11,6 @@
- 🐞 **自动 Bug 修复**:通过编写**Bug 工单**(如 `bug.issue`),描述问题现象,工具结合代码和错误信息生成修复补丁。 - 🐞 **自动 Bug 修复**:通过编写**Bug 工单**(如 `bug.issue`),描述问题现象,工具结合代码和错误信息生成修复补丁。
- 🔧 **命令执行**:生成文件后可自动执行建议命令(如安装依赖、运行构建),内置危险命令拦截(执行命令失败不会终止任务,仅记录错误)。 - 🔧 **命令执行**:生成文件后可自动执行建议命令(如安装依赖、运行构建),内置危险命令拦截(执行命令失败不会终止任务,仅记录错误)。
- ✅ **单元测试**:使用 `pytest` 编写测试用例,支持测试覆盖率统计。 - ✅ **单元测试**:使用 `pytest` 编写测试用例,支持测试覆盖率统计。
- 🔍 **并行检查**:生成代码后并行运行多个检查工具(`pylint`、`mypy`、`black` 等),收集错误信息。
- 🔄 **自修复**将检查错误、README、design.json 和相关代码提交给 LLM自动生成修复补丁并应用。
- ⏯️ **断点续写**:生成过程中断后可自动从上次中断处继续,状态保存在 `.llm_generator_state.json` - ⏯️ **断点续写**:生成过程中断后可自动从上次中断处继续,状态保存在 `.llm_generator_state.json`
- 🖥️ **命令行工具**:提供 `llm-codegen` 命令,支持多种操作模式。 - 🖥️ **命令行工具**:提供 `llm-codegen` 命令,支持多种操作模式。
- 📝 **详细日志**所有操作、LLM 响应、错误均通过 `loguru` 记录到文件。 - 📝 **详细日志**所有操作、LLM 响应、错误均通过 `loguru` 记录到文件。
@ -200,52 +198,7 @@ llm-codegen design project_readme.md -o ./my_design
4. 应用变更,更新 `design.json` 中的摘要(如果新增了函数/类)。 4. 应用变更,更新 `design.json` 中的摘要(如果新增了函数/类)。
5. 执行检查与修复。 5. 执行检查与修复。
## 📊 Diff 输出格式支持
工具支持在生成代码变更时输出 diff 格式便于代码审查和集成到版本控制系统。diff 输出以标准 unified diff 格式呈现,适用于 `enhance``fix` 操作。
### 字段描述
diff 输出包含以下关键字段:
- **文件名**:变更的文件路径,如 `src/llm_codegen/core.py`
- **行号**:变更发生的行号范围,使用 `@@` 标记表示。
- **旧代码**:被修改或删除的代码行,以 `-` 开头。
- **新代码**:新增或修改后的代码行,以 `+` 开头。
- **变更类型**:隐含在 diff 中,如添加(只有 `+` 行)、删除(只有 `-` 行)、修改(同时有 `-``+` 行)。
### 使用示例
运行 `llm-codegen enhance``llm-codegen fix` 时,通过 `--diff` 选项启用 diff 输出。例如:
```bash
llm-codegen enhance feature.issue -o ./project --diff
```
输出示例:
```diff
--- a/src/llm_codegen/core.py
+++ b/src/llm_codegen/core.py
@@ -10,7 +10,7 @@
def generate_file(self, file_path, prompt_instruction, dependency_files):
# 生成代码逻辑
code = self._call_llm(...)
- commands = []
+ commands = ["安装依赖"]
return code, commands
```
### 注意事项
- diff 输出功能仅适用于增强(`enhance`)和修复(`fix`)操作,初始化(`init`)操作不产生 diff因为它是从头生成。
- 确保使用支持 diff 格式的工具(如 `git diff`、`diff` 命令)查看和应用变更。
- 如果不需要 diff 输出,可以省略 `--diff` 选项,工具将直接应用变更到文件。
- diff 输出不影响工具的核心功能,仅为可选辅助特性。
### 内部实现
工具在生成 diff 输出后,内部使用 `src/llm_codegen/diff_applier.py` 模块来解析和应用 diff 到代码文件。该模块负责读取 diff 格式,验证变更,并安全地更新文件。开发者可以查看此模块的代码以了解更多细节,例如如何与 LLM 响应集成和确保变更的正确性。
## 📝 工单模板 ## 📝 工单模板
@ -311,11 +264,12 @@ uv pip install -e ".[dev]"
│ └── llm_codegen/ # 主代码包 │ └── llm_codegen/ # 主代码包
│ ├── __init__.py │ ├── __init__.py
│ ├── cli.py # 命令行入口typer │ ├── cli.py # 命令行入口typer
│ ├── core.py # 核心生成逻辑CodeGenerator 类) │ ├── core.py # 核心生成逻辑BaseGenerator 类)
│ ├── checker.py # 并行检查与修复模块 │ ├── enhance_generator.py
│ ├── fix_generator.py
│ ├── init_generator.py
│ ├── utils.py # 工具函数(危险命令判断、文件操作) │ ├── utils.py # 工具函数(危险命令判断、文件操作)
│ └── models.py # 数据模型Pydantic │ └── models.py # 数据模型Pydantic
│ └── diff_applier.py # 应用llm返回的diff
├── tests/ # 单元测试 ├── tests/ # 单元测试
│ ├── __init__.py │ ├── __init__.py
│ ├── test_cli.py │ ├── test_cli.py

View File

@ -3,6 +3,17 @@
"version": "1.0.0", "version": "1.0.0",
"description": "一个基于大语言模型的智能代码生成与维护工具支持自动生成、增量添加功能和自动修复Bug。", "description": "一个基于大语言模型的智能代码生成与维护工具支持自动生成、增量添加功能和自动修复Bug。",
"files": [ "files": [
{
"path": "README.md",
"summary": "项目说明文档,包含项目概述、功能介绍和使用说明",
"dependencies": [
"src/llm_codegen/cli.py"
],
"functions": [],
"classes": [],
"design_updates": {}
},
{ {
"path": "pyproject.toml", "path": "pyproject.toml",
"summary": "项目元数据、依赖配置和脚本入口", "summary": "项目元数据、依赖配置和脚本入口",
@ -14,7 +25,9 @@
{ {
"path": "src/llm_codegen/__init__.py", "path": "src/llm_codegen/__init__.py",
"summary": "包初始化文件", "summary": "包初始化文件",
"dependencies": [], "dependencies": [
"src/llm_codegen/core.py"
],
"functions": [], "functions": [],
"classes": [], "classes": [],
"design_updates": {} "design_updates": {}
@ -42,7 +55,6 @@
"summary": "核心生成逻辑包含CodeGenerator类", "summary": "核心生成逻辑包含CodeGenerator类",
"dependencies": [ "dependencies": [
"src/llm_codegen/utils.py", "src/llm_codegen/utils.py",
"src/llm_codegen/diff_applier.py",
"src/llm_codegen/models.py" "src/llm_codegen/models.py"
], ],
"functions": [ "functions": [
@ -127,17 +139,6 @@
], ],
"design_updates": {} "design_updates": {}
}, },
{
"path": "src/llm_codegen/checker.py",
"summary": "并行检查与修复模块,运行检查工具并收集错误",
"dependencies": [
"src/llm_codegen/core.py",
"src/llm_codegen/models.py"
],
"functions": [],
"classes": [],
"design_updates": {}
},
{ {
"path": "src/llm_codegen/utils.py", "path": "src/llm_codegen/utils.py",
"summary": "工具函数,如危险命令判断和文件操作", "summary": "工具函数,如危险命令判断和文件操作",
@ -287,6 +288,22 @@
} }
], ],
"design_updates": {} "design_updates": {}
},
{
"path": "src/llm_codegen/design_generator.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "tests/test_design_generator.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
} }
], ],
"commands": [ "commands": [

View File

@ -0,0 +1,49 @@
name: redesign-design-command.issue
description: |
当前 `llm-codegen design` 子命令仅能根据 README 文件生成 `design.json`,功能单一,无法满足项目演进过程中对设计文件进行增量维护的需求。
在实际开发中,开发者可能手动修改了代码文件,或者通过其他工具生成了代码,需要将这些变更同步回 `design.json` 以保持设计文档与代码的一致性。
因此,需要重新设计 `design` 子命令,使其具备以下能力:
- 支持指定一个或多个源文件Python 文件)作为输入,分析其内容并更新 `design.json` 中对应文件的条目(如摘要、依赖、函数列表、类列表等)。
- 如果指定的源文件在 `design.json` 中不存在,则自动添加新的文件条目。
- 如果未指定任何源文件,则默认仍然可以基于 README 生成初始 `design.json`(即保留原有功能)。
- 支持 `--force` 选项以强制覆盖现有条目,而不是合并更新。
- 提供 `--dry-run` 选项预览将要进行的更改而不实际写入文件。
- 所有分析过程仍通过 LLM 调用完成,确保生成的摘要、函数/类描述准确反映代码内容。
- 更新后的 `design.json` 应保持原有结构,且格式良好(缩进等)。
这一改进将使 `design` 命令成为维护项目设计文档的实用工具,而不仅仅是初始化工具。
affected_files:
- src/llm_codegen/cli.py
- src/llm_codegen/core.py
- src/llm_codegen/design_generator.py # 新建
- src/llm_codegen/models.py # 可能调整 FileModel 或新增方法
acceptance_criteria:
- 新增命令行选项:
- `--source` / `-s`:可重复使用,指定一个或多个源文件路径(相对于项目根目录)。
- `--force`:强制覆盖现有条目,而不是合并(合并策略见下文)。
- `--dry-run`:仅显示将要做的更改,不实际写入文件。
- 如果同时指定 `--source` 和 `--file`(原来的 README 参数),则行为需明确定义(例如先基于 README 生成框架,再根据源文件更新,或报错提示只能使用一种模式)。
- 当指定 `--source` 时,程序应:
1. 读取现有的 `design.json`(如果存在),否则视为空设计。
2. 对于每个源文件,读取其内容。
3. 调用 LLM 分析该文件内容,生成该文件的 `FileModel` 条目(包括 `summary`、`dependencies`、`functions`、`classes`)。
4. 将生成的条目与现有条目合并(除非使用 `--force`
- 合并规则:如果现有条目存在,则保留原有字段,仅更新 LLM 提供的字段(例如,如果 LLM 只返回了 `functions`,则只更新 `functions`,保留原有的 `summary` 和 `dependencies`;如果 LLM 返回了完整信息,则全部更新)。建议在 prompt 中要求 LLM 返回完整的文件条目,但由程序决定合并逻辑。
- 如果使用 `--force`,则直接替换整个文件条目。
5. 如果文件在 `design.json` 中不存在,则直接添加新条目。
6. 所有文件处理完成后,保存更新后的 `design.json`。
- 当未指定 `--source` 时,行为与旧版一致:根据 README 生成完整的 `design.json`(覆盖原有)。
- 为了保证 LLM 分析的质量system prompt 应明确要求返回符合 `FileModel` 结构的 JSON并提供示例。
- 更新 `cli.py` 中的 `design` 命令,添加上述选项,并调用相应的后端逻辑。
- 在 `core.py` 中现有 `update_file_entry` 方法可被复用或增强,但建议将设计相关的逻辑迁移到新的 `DesignGenerator` 类中(或直接在 `core.py` 中扩展),以保持代码清晰。
- 添加单元测试覆盖以下场景:
- 从单个源文件更新现有条目。
- 从多个源文件同时更新(包括新增和修改)。
- 使用 `--force` 覆盖已有条目。
- 使用 `--dry-run` 不实际写入。
- 未指定 `--source` 时仍能基于 README 生成。
- 处理不存在的源文件时给出友好错误提示。
- 确保与现有 `enhance`、`fix` 命令兼容,不会影响它们的正常功能。
- 所有代码通过 lint 和类型检查,日志记录完整。

View File

@ -0,0 +1,33 @@
# 需求工单:拆分 core.py 为多个专注模块
name: 重构 core.py拆分为多个单一职责的模块
description: |
当前 core.py 文件过于庞大,包含 LLM 调用、文件操作、命令执行、依赖排序、状态管理、设计文件维护等多个职责,导致代码难以维护和测试。需要将其拆分为多个独立的模块,每个模块负责一个清晰的功能领域,并通过组合方式在 BaseGenerator 中集成。
主要拆分目标:
- 创建 `llm_client.py`:封装 LLM API 调用、响应保存和思考过程记录。
- 创建 `file_operations.py`:处理文件读写、目录创建和 diff 应用。
- 创建 `command_executor.py`:执行系统命令,集成危险命令拦截。
- 创建 `dependency_sorter.py`:提供依赖关系的拓扑排序及循环检测。
- 创建 `design_manager.py`:管理 design.json 的加载、保存、更新及同步操作。
- 创建 `state_manager.py`:管理断点续写状态文件的读写(线程安全)。
- 精简 `core.py` 中的 BaseGenerator使其组合以上组件保留对外接口不变。
同时需要更新 README.md反映新的模块结构和设计思想。
affected_files:
- src/llm_codegen/core.py
- src/llm_codegen/llm_client.py # 新增
- src/llm_codegen/file_operations.py # 新增
- src/llm_codegen/command_executor.py # 新增
- src/llm_codegen/dependency_sorter.py # 新增
- src/llm_codegen/design_manager.py # 新增
- src/llm_codegen/state_manager.py # 新增
- README.md
acceptance_criteria:
- 所有原有功能init、enhance、fix、design 子命令)在重构后行为完全一致,不引入新 bug。
- core.py 中的 _call_llm、_topological_sort、文件读写、命令执行、状态保存、design 操作等逻辑均迁移至对应新模块BaseGenerator 仅保留组合与高层流程。
- 新模块职责单一,相互之间通过明确的接口调用,无循环依赖。
- 单元测试覆盖核心功能,且原有测试用例全部通过。
- README.md 中的“项目结构”部分更新为新文件列表,并简要说明各模块职责。
- 日志记录、进度显示、错误提示等用户体验相关功能保持不变。
- 代码风格符合项目规范(通过 black、pylint 等检查)。
- 生成对应的单元测试。

View File

@ -9,7 +9,7 @@ __version__ = "1.0.0"
__description__ = "一个基于大语言模型的智能代码生成与维护工具" __description__ = "一个基于大语言模型的智能代码生成与维护工具"
# 导出核心模块以便从包级别导入 # 导出核心模块以便从包级别导入
from .core import CodeGenerator # from .core import CodeGenerator
# from .cli import main # from .cli import main
__all__ = ["CodeGenerator", "__version__", "__description__"] __all__ = ["__version__", "__description__"]

View File

@ -1,499 +0,0 @@
import json
import subprocess
import sys
from typing import List, Dict, Optional, Tuple, Any
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import warnings
from loguru import logger
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn, TimeRemainingColumn
from .core import CodeGenerator
# 尝试导入 pathspec用于精确解析 .gitignore
try:
from pathspec import PathSpec
from pathspec.patterns import GitWildMatchPattern
HAS_PATHSPEC = True
except ImportError:
HAS_PATHSPEC = False
import fnmatch
warnings.warn(
"pathspec 未安装,将使用简单的通配符匹配处理 .gitignore可能不完全准确"
"建议安装pip install pathspec"
)
class Checker:
"""
并行检查与修复模块运行检查工具默认 black并收集错误
支持自动调用 LLM 生成修复补丁
"""
def __init__(
self,
output_dir: Path,
check_tools: Optional[List[str]] = None,
code_generator: Optional[CodeGenerator] = None,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
):
"""
初始化检查器
Args:
output_dir: 项目输出目录用于查找代码文件和保存检查结果
check_tools: 检查工具列表默认为 ["black"]若传入多个工具仅使用第一个
code_generator: CodeGenerator 实例用于调用 LLM
api_key, base_url, model: 用于创建 CodeGenerator code_generator None
"""
self.output_dir = Path(output_dir)
# 处理检查工具:默认 black若传入多个则取第一个并警告
if check_tools is None:
self.check_tools = ["black"]
else:
if len(check_tools) > 1:
logger.warning(
f"检测到多个检查工具 {check_tools},将只使用第一个:{check_tools[0]}"
)
self.check_tools = [check_tools[0]] if check_tools else ["black"]
if code_generator:
self.code_generator = code_generator
else:
self.code_generator = CodeGenerator(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=str(self.output_dir),
)
self.results_file = self.output_dir / "check_results.json"
logger.info(f"Checker 初始化完成,输出目录: {self.output_dir},检查工具: {self.check_tools}")
def _load_gitignore_patterns(self) -> Optional[Any]:
"""
加载 .gitignore 文件中的模式返回一个可用于匹配的函数或对象
若文件不存在或解析失败返回 None
"""
gitignore_path = self.output_dir / ".gitignore"
if not gitignore_path.exists():
return None
try:
with open(gitignore_path, "r", encoding="utf-8") as f:
lines = f.readlines()
except Exception as e:
logger.warning(f"读取 .gitignore 失败: {e}")
return None
if HAS_PATHSPEC:
# 使用 pathspec 精确解析
try:
spec = PathSpec.from_lines(GitWildMatchPattern, lines)
return spec
except Exception as e:
logger.warning(f"解析 .gitignore 失败,将使用简单匹配: {e}")
return None
else:
# 回退到简单通配符匹配(忽略空行、注释,不支持 ** 和否定模式)
patterns = []
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
# 移除末尾的注释(# 后面的内容)
if "#" in line:
line = line.split("#", 1)[0].strip()
if line:
patterns.append(line)
return patterns
def _is_ignored_by_gitignore(self, rel_path: str, gitignore_matcher) -> bool:
"""
判断相对路径是否被 .gitignore 忽略
gitignore_matcher 可以是 PathSpec 对象或简单模式列表
"""
if gitignore_matcher is None:
return False
# 将路径转换为 POSIX 风格(使用 / 分隔符)
rel_path = rel_path.replace(os.sep, "/")
if HAS_PATHSPEC and isinstance(gitignore_matcher, PathSpec):
return gitignore_matcher.match_file(rel_path)
elif isinstance(gitignore_matcher, list):
# 简单匹配:对于每个模式,如果模式以 / 结尾,则匹配目录;否则匹配文件
for pattern in gitignore_matcher:
# 处理目录模式(以 / 结尾)
if pattern.endswith("/"):
# 检查路径是否以该目录开头
if rel_path.startswith(pattern.rstrip("/") + "/") or rel_path == pattern.rstrip("/"):
return True
else:
# 文件/通配符模式,使用 fnmatch
if fnmatch.fnmatch(rel_path, pattern):
return True
return False
def _filter_files_by_gitignore(self, files: List[Path]) -> List[Path]:
"""
根据 .gitignore 和硬编码规则 .git/过滤文件列表
"""
gitignore_matcher = self._load_gitignore_patterns()
filtered = []
for file_path in files:
try:
# 计算相对于输出目录的路径
rel_path = file_path.relative_to(self.output_dir).as_posix()
except ValueError:
# 如果文件不在输出目录下(例如绝对路径),则保留(不过滤)
logger.warning(f"文件 {file_path} 不在输出目录 {self.output_dir} 下,将保留")
filtered.append(file_path)
continue
# 硬编码忽略 .git 目录
if rel_path.startswith(".git/") or rel_path == ".git":
logger.debug(f"忽略 .git 目录下的文件: {rel_path}")
continue
# 检查 .gitignore
if self._is_ignored_by_gitignore(rel_path, gitignore_matcher):
logger.debug(f"忽略 .gitignore 中的文件: {rel_path}")
continue
filtered.append(file_path)
return filtered
def run_check(self, tool: str, file_path: Path) -> Dict[str, Any]:
"""
运行单个检查工具并返回结果
Args:
tool: 检查工具名称 'black'
file_path: 要检查的文件路径
Returns:
Dict 包含工具名返回码stdoutstderr 和错误信息
"""
logger.debug(f"运行检查工具: {tool} 在文件: {file_path}")
# 构建命令,根据工具不同调整
if tool == "black":
cmd = f"black --check --diff {file_path}"
elif tool == "pylint":
cmd = f"pylint {file_path} --output-format=json"
elif tool == "mypy":
cmd = f"mypy {file_path} --show-error-codes --no-error-summary"
else:
# 默认直接运行工具
cmd = f"{tool} {file_path}"
try:
result = subprocess.run(
cmd,
shell=True,
cwd=self.output_dir,
capture_output=True,
text=True,
timeout=60, # 1 分钟超时
)
# 解析错误信息
errors = []
if result.stderr:
errors.append(result.stderr.strip())
if result.stdout:
# 对于 pylint 的 JSON 输出,可以进一步解析
if tool == "pylint" and result.returncode != 0:
try:
pylint_errors = json.loads(result.stdout)
errors.extend([e.get("message", "") for e in pylint_errors])
except json.JSONDecodeError:
errors.append(result.stdout.strip())
elif result.returncode != 0:
errors.append(result.stdout.strip())
return {
"tool": tool,
"file": str(file_path),
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"errors": errors,
}
except subprocess.TimeoutExpired:
logger.error(f"检查工具 {tool} 超时: {cmd}")
return {
"tool": tool,
"file": str(file_path),
"returncode": -1,
"stdout": "",
"stderr": "检查超时",
"errors": ["检查超时"],
}
except Exception as e:
logger.error(f"运行检查工具 {tool} 失败: {e}")
return {
"tool": tool,
"file": str(file_path),
"returncode": -1,
"stdout": "",
"stderr": str(e),
"errors": [str(e)],
}
def run_parallel_checks(self, files: Optional[List[Path]] = None) -> List[Dict[str, Any]]:
"""
并行运行检查工具在指定文件上仅使用配置的第一个工具
Args:
files: 要检查的文件路径列表如果为 None 则自动查找输出目录下所有 .py 文件排除 .gitignore 中的
Returns:
检查结果列表
"""
if files is None:
# 递归查找所有 .py 文件
files = list(self.output_dir.rglob("*.py"))
# 过滤 .gitignore 中的文件
files = self._filter_files_by_gitignore(files)
# 只使用第一个工具(已在 __init__ 中保证 self.check_tools 只有一个元素)
tool = self.check_tools[0]
logger.info(f"开始并行检查,文件数: {len(files)},工具: {tool}")
all_results = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
) as progress:
task = progress.add_task("[cyan]Running parallel checks...", total=len(files))
with ThreadPoolExecutor(max_workers=min(4, len(files))) as executor:
futures = [executor.submit(self.run_check, tool, file_path) for file_path in files]
error_count = 0 # 初始化错误计数
for future in as_completed(futures):
try:
result = future.result()
all_results.append(result)
# 检查并更新错误计数
if result.get("errors") and result["errors"]:
error_count += len(result["errors"])
except Exception as e:
logger.error(f"并行检查任务失败: {e}")
finally:
# 更新进度条:前进并更新描述以显示错误统计
progress.update(task, advance=1, description=f"[cyan]Running parallel checks... Errors: {error_count}")
# 保存结果到文件
self.save_results(all_results)
logger.info(f"并行检查完成,总结果数: {len(all_results)},总错误数: {error_count}")
return all_results
def save_results(self, results: List[Dict[str, Any]]) -> None:
"""保存检查结果到 JSON 文件"""
try:
with open(self.results_file, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
logger.debug(f"检查结果已保存至: {self.results_file}")
except Exception as e:
logger.error(f"保存检查结果失败: {e}")
def collect_errors(self, results: Optional[List[Dict[str, Any]]] = None) -> List[Dict[str, Any]]:
"""
从检查结果中收集所有错误
Args:
results: 检查结果列表如果为 None 则从文件加载
Returns:
错误列表每个错误包含文件工具和错误信息
"""
if results is None:
if self.results_file.exists():
try:
with open(self.results_file, "r", encoding="utf-8") as f:
results = json.load(f)
except Exception as e:
logger.error(f"加载检查结果失败: {e}")
return []
else:
logger.warning("无检查结果文件,先运行检查")
return []
errors = []
for result in results:
if result.get("errors") and result["errors"]:
for error_msg in result["errors"]:
if error_msg: # 跳过空错误
errors.append({
"file": result["file"],
"tool": result["tool"],
"error": error_msg,
})
logger.info(f"收集到 {len(errors)} 个错误")
return errors
def auto_fix(self, errors: List[Dict[str, Any]], context_files: Optional[List[str]] = None) -> bool:
"""
自动调用 LLM 生成修复补丁并应用
Args:
errors: 错误列表来自 collect_errors
context_files: 上下文文件路径列表用于 LLM 生成修复
Returns:
bool: 修复是否成功至少修复了一个错误
"""
if not errors:
logger.info("没有错误需要修复")
return True
logger.info(f"开始自动修复 {len(errors)} 个错误")
# 准备上下文:包括 README、design.json 和相关代码文件
context_content = []
# 添加 README如果存在
readme_path = self.output_dir / "README.md"
if readme_path.exists():
with open(readme_path, "r", encoding="utf-8") as f:
context_content.append(f"### 项目 README ###\n{f.read()}\n")
# 添加 design.json如果存在
design_path = self.output_dir / "design.json"
if design_path.exists():
with open(design_path, "r", encoding="utf-8") as f:
context_content.append(f"### 设计文件: design.json ###\n{f.read()}\n")
# 添加错误相关的代码文件
if context_files is None:
context_files = list(set(error["file"] for error in errors))
for file_path in context_files:
path = Path(file_path)
if not path.exists():
path = self.output_dir / file_path
if path.exists():
with open(path, "r", encoding="utf-8") as f:
context_content.append(f"### 文件: {path.name} (路径: {file_path}) ###\n{f.read()}\n")
# 添加错误信息
errors_str = json.dumps(errors, indent=2, ensure_ascii=False)
context_content.append(f"### 检查错误列表 ###\n{errors_str}\n")
full_context = "\n".join(context_content)
# 调用 LLM 生成修复
system_prompt = (
"你是一个专业的编程助手,擅长修复代码错误。根据提供的上下文(包括项目 README、设计文件、相关代码和检查错误"
"生成修复补丁代码。返回严格的 JSON 对象,包含两个字段:\n"
"- patches: 数组,每个元素是一个对象,包含 'file'(文件路径)和 'code'(修复后的完整代码或差异)\n"
"- description: 简短的中文修复描述\n"
"注意:只修复提到的错误,保持代码风格一致。"
)
user_prompt = f"请修复以下检查错误:\n\n{full_context}"
try:
result = self.code_generator._call_llm(system_prompt, user_prompt, temperature=0.1)
patches = result.get("patches", [])
description = result.get("description", "无描述")
logger.info(f"LLM 生成修复补丁: {description}, 补丁数: {len(patches)}")
# 应用补丁,使用进度条
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
) as progress:
task = progress.add_task("[cyan]Applying fixes...", total=len(patches))
success_count = 0
for patch in patches:
file_path = patch.get("file")
code = patch.get("code")
if not file_path or not code:
logger.warning(f"无效补丁: {patch}")
progress.update(task, advance=1)
continue
full_path = self.output_dir / file_path
try:
with open(full_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已应用修复到文件: {file_path}")
success_count += 1
except Exception as e:
logger.error(f"应用修复失败到文件 {file_path}: {e}")
finally:
progress.update(task, advance=1)
logger.info(f"自动修复完成,成功修复 {success_count}/{len(patches)} 个补丁")
return success_count > 0
except Exception as e:
logger.error(f"调用 LLM 生成修复失败: {e}")
return False
def run_full_check_and_fix(self, max_retries: int = 3) -> bool:
"""
运行完整检查与修复循环直到无错误或达到最大重试次数
Args:
max_retries: 最大修复重试次数
Returns:
bool: 是否成功无错误或修复后无错误
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
) as progress:
task = progress.add_task("[cyan]Full check and fix cycle", total=max_retries)
for attempt in range(max_retries):
progress.update(task, description=f"[cyan]Attempt {attempt + 1}/{max_retries}")
logger.info(f"检查与修复循环,尝试 {attempt + 1}/{max_retries}")
# 运行并行检查
results = self.run_parallel_checks()
errors = self.collect_errors(results)
if not errors:
progress.update(task, completed=max_retries)
logger.success("所有检查通过,无错误")
return True
logger.warning(f"发现 {len(errors)} 个错误,尝试自动修复")
success = self.auto_fix(errors)
if not success:
logger.error(f"{attempt + 1} 次修复失败")
progress.update(task, advance=1)
if attempt == max_retries - 1:
return False
else:
logger.info(f"{attempt + 1} 次修复成功,重新检查")
progress.update(task, advance=1)
# 最后一次检查
progress.update(task, description="[cyan]Final check...")
results = self.run_parallel_checks()
errors = self.collect_errors(results)
if errors:
logger.error(f"修复后仍有 {len(errors)} 个错误")
return False
else:
logger.success("修复后所有检查通过")
return True

View File

@ -17,7 +17,7 @@ from .core import BaseGenerator
from .init_generator import InitGenerator from .init_generator import InitGenerator
from .enhance_generator import EnhanceGenerator from .enhance_generator import EnhanceGenerator
from .fix_generator import FixGenerator from .fix_generator import FixGenerator
from .checker import Checker from .design_generator import DesignGenerator # 新增导入DesignGenerator
app = typer.Typer(help="基于LLM的自动化代码生成与维护工具") app = typer.Typer(help="基于LLM的自动化代码生成与维护工具")
console = Console() console = Console()
@ -129,14 +129,6 @@ def enhance(
f"design.json 不存在于 {output_dir},请先运行 init 命令初始化项目。" f"design.json 不存在于 {output_dir},请先运行 init 命令初始化项目。"
) )
raise typer.Exit(code=1) raise typer.Exit(code=1)
# 读取工单文件
try:
with open(issue_file, "r", encoding="utf-8") as f:
issue_content = f.read()
except Exception as e:
logger.error(f"读取工单文件失败: {e}")
raise typer.Exit(code=1)
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
@ -201,13 +193,6 @@ def fix(
logger.error(f"design.json 不存在于 {output_dir},请确保项目已初始化。") logger.error(f"design.json 不存在于 {output_dir},请确保项目已初始化。")
raise typer.Exit(code=1) raise typer.Exit(code=1)
# 读取工单文件
try:
with open(issue_file, "r", encoding="utf-8") as f:
issue_content = f.read()
except Exception as e:
logger.error(f"读取工单文件失败: {e}")
raise typer.Exit(code=1)
try: try:
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
@ -240,19 +225,28 @@ def fix(
@app.command() @app.command()
def design( def design(
file: Path = typer.Option( file: Optional[Path] = typer.Option(
..., None,
"--file", "--file",
"-f", "-f",
help="README文件路径用于生成design.json", help="README文件路径用于生成design.json;如果与--source同时使用则生成design.json后从源代码刷新",
exists=True, exists=True,
file_okay=True, file_okay=True,
dir_okay=False, dir_okay=False,
), ),
source: Optional[Path] = typer.Option(
None,
"--source",
help="源代码目录路径用于从源代码刷新design.json必须为目录",
exists=True,
file_okay=False,
dir_okay=True,
),
output_dir: Optional[Path] = typer.Option( output_dir: Optional[Path] = typer.Option(
None, "--output", "-o", help="输出目录design.json将保存在此默认为当前目录" None, "--output", "-o", help="输出目录design.json将保存在此默认为当前目录"
), ),
force: bool = typer.Option(False, "--force", help="强制覆盖已存在的design.json"), force: bool = typer.Option(False, "--force", help="强制覆盖已存在的design.json或强制从源代码刷新"),
dry_run: bool = typer.Option(False, "--dry-run", help="模拟运行,不实际写入文件或执行命令,仅打印信息"),
api_key: Optional[str] = typer.Option( api_key: Optional[str] = typer.Option(
None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥" None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥"
), ),
@ -265,21 +259,19 @@ def design(
4, "--max-concurrency", help="并发生成的最大工作线程数默认4" 4, "--max-concurrency", help="并发生成的最大工作线程数默认4"
), ),
): ):
"""生成或更新design.json根据README文件生成中间设计文件不生成完整代码""" """生成或更新design.json支持从README生成、从源代码刷新并集成新的设计生成逻辑"""
if output_dir is None: if output_dir is None:
output_dir = Path.cwd() output_dir = Path.cwd()
# 初始化日志配置 # 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="design") log_file_path = init_logging(output_dir, log_file, command_name="design")
# 检查design.json是否存在并处理强制覆盖 # 检查是否提供了至少一个操作参数
design_path = output_dir / "design.json" if file is None and source is None:
if not force and design_path.exists(): logger.error("必须提供 --file 或 --source 参数之一来执行操作。")
logger.error(
f"design.json 已存在于 {design_path}。使用 --force 参数以强制覆盖。"
)
raise typer.Exit(code=1) raise typer.Exit(code=1)
# 初始化DesignGenerator以集成新的设计生成逻辑
try: try:
with Progress( with Progress(
SpinnerColumn(), SpinnerColumn(),
@ -287,8 +279,8 @@ def design(
BarColumn(), BarColumn(),
console=console, console=console,
) as progress: ) as progress:
task_id = progress.add_task("正在生成design.json...", total=1) # 可选:保持现有风格,但工单未要求修改此命令 task_id = progress.add_task("正在处理design命令...", total=1)
generator = BaseGenerator( generator = DesignGenerator(
api_key=api_key, api_key=api_key,
base_url=base_url, base_url=base_url,
model=model, model=model,
@ -296,61 +288,51 @@ def design(
log_file=log_file_path, log_file=log_file_path,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
# 解析README文件并设置内容
generator.readme_content = generator.parse_readme(file) design_path = output_dir / "design.json"
# 生成design.json
generator.generate_design_json() # 处理--dry-run选项
progress.update(task_id, completed=1, description="design.json 生成完成") # 可选:更新完成状态 if dry_run:
console.print(f"[green]✅ design.json 已生成在 {design_path}[/green]") console.print("[yellow]模拟运行模式:不会实际写入文件或执行命令。[/yellow]")
if file is not None:
logger.info(f"模拟将从README文件 {file} 生成design.json")
# 在dry-run模式下仅模拟解析README
content = generator.parse_readme(file)
console.print(f"[blue]模拟解析README内容完成长度: {len(content)} 字符[/blue]")
if source is not None:
logger.info(f"模拟:将从源代码目录 {source} 刷新design.json")
# 模拟分析源代码
design_info = generator.analyze_source_files(source)
console.print(f"[blue]模拟分析完成,共分析 {len(design_info['files'])} 个文件[/blue]")
progress.update(task_id, completed=1, description="模拟运行完成")
console.print("[green]✅ 模拟运行完成,无实际文件操作。[/green]")
return
# 实际运行逻辑
if file is not None:
# 生成design.json从README
if not force and design_path.exists():
logger.error(
f"design.json 已存在于 {design_path}。使用 --force 参数以强制覆盖。"
)
raise typer.Exit(code=1)
generator.run(readme_path=file)
logger.info(f"已从README生成design.json: {design_path}")
if source is not None:
# 从源代码刷新design.json
success = generator.refresh_design_from_source(source)
if not success:
logger.error("从源代码刷新design.json失败")
raise typer.Exit(code=1)
logger.info(f"已从源代码刷新design.json: {design_path}")
progress.update(task_id, completed=1, description="design命令处理完成")
console.print(f"[green]✅ design.json 已处理完成,路径: {design_path}[/green]")
except Exception as e: except Exception as e:
logger.error(f"生成design.json失败: {e}") logger.error(f"处理design命令失败: {e}")
raise typer.Exit(code=1)
@app.command()
def check(
output_dir: Optional[Path] = typer.Option(
None, "--output", "-o", help="项目根目录,默认为当前目录"
),
api_key: Optional[str] = typer.Option(
None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥"
),
base_url: str = typer.Option(
"https://api.deepseek.com", "--base-url", help="API基础URL"
),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_retries: int = typer.Option(3, "--max-retries", help="最大修复重试次数"),
max_concurrency: int = typer.Option(
4, "--max-concurrency", help="并发生成的最大工作线程数默认4"
),
):
"""运行代码检查和自动修复(不依赖于工单)"""
if output_dir is None:
output_dir = Path.cwd()
# 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="check")
try:
generator = BaseGenerator(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=str(output_dir),
log_file=log_file_path,
max_concurrency=max_concurrency,
)
checker = Checker(output_dir=output_dir, code_generator=generator)
success = checker.run_full_check_and_fix(max_retries=max_retries)
if not success:
logger.error("检查修复失败")
raise typer.Exit(code=1)
console.print("[green]检查与修复完成。详情请查看日志。[/green]")
except Exception as e:
logger.error(f"检查失败: {e}")
raise typer.Exit(code=1) raise typer.Exit(code=1)
if __name__ == "__main__": if __name__ == "__main__":
app() app()

View File

@ -15,11 +15,15 @@ from loguru import logger
from openai import OpenAI from openai import OpenAI
from .utils import is_dangerous_command from .utils import is_dangerous_command
from .models import DesignModel, StateModel, FileModel, FileStatus # 添加 FileStatus 导入 from .models import (
from .diff_applier import parse_diff, apply_diff DesignModel,
StateModel,
FileModel,
FileStatus,
) # 添加 FileStatus 导入
class BaseGenerator: class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
"""代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行"""
def __init__( def __init__(
@ -29,7 +33,7 @@ class BaseGenerator:
model: str = "deepseek-reasoner", model: str = "deepseek-reasoner",
output_dir: str = "./generated", output_dir: str = "./generated",
log_file: Optional[str] = None, log_file: Optional[str] = None,
max_concurrency: int = 4 max_concurrency: int = 4,
): ):
""" """
初始化生成器 初始化生成器
@ -118,12 +122,12 @@ class BaseGenerator:
"system_prompt": system_prompt, "system_prompt": system_prompt,
"user_prompt": user_prompt, "user_prompt": user_prompt,
"temperature": temperature, "temperature": temperature,
"expect_json": expect_json "expect_json": expect_json,
} }
with open(response_file, "w", encoding="utf-8") as f: with open(response_file, "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False) json.dump(response_data, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM原始响应: {response_file.name}") logger.debug(f"LLM原始响应: {response_file.name}")
if expect_json: if expect_json:
@ -142,7 +146,6 @@ class BaseGenerator:
self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]")
raise raise
def parse_readme(self, readme_path: Path) -> str: def parse_readme(self, readme_path: Path) -> str:
""" """
读取README文件内容 读取README文件内容
@ -168,17 +171,17 @@ class BaseGenerator:
"返回严格的 JSON 对象符合DesignModel结构。" "返回严格的 JSON 对象符合DesignModel结构。"
) )
user_prompt = f"README内容如下\n\n{self.readme_content}" user_prompt = f"README内容如下\n\n{self.readme_content}"
result = self._call_llm(system_prompt, user_prompt) result = self._call_llm(system_prompt, user_prompt)
design_data = result design_data = result
design = DesignModel(**design_data) design = DesignModel(**design_data)
# 写入design.json文件 # 写入design.json文件
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f: with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"已生成design.json: {design_path}") logger.info(f"已生成design.json: {design_path}")
return design return design
def load_state(self) -> Optional[StateModel]: def load_state(self) -> Optional[StateModel]:
@ -188,7 +191,9 @@ class BaseGenerator:
with open(self.state_file, "r", encoding="utf-8") as f: with open(self.state_file, "r", encoding="utf-8") as f:
state_data = json.load(f) state_data = json.load(f)
self.state = StateModel(**state_data) self.state = StateModel(**state_data)
logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)}") logger.info(
f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)}"
)
return self.state return self.state
except Exception as e: except Exception as e:
logger.error(f"加载状态失败: {e}") logger.error(f"加载状态失败: {e}")
@ -196,7 +201,9 @@ class BaseGenerator:
return None return None
return None return None
def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: def save_state(
self, generated_files: List[str], dependencies_map: Dict[str, List[str]]
) -> None:
"""保存断点续写状态,适应并发生成(线程安全)""" """保存断点续写状态,适应并发生成(线程安全)"""
with self._state_lock: # 串行化写入 with self._state_lock: # 串行化写入
state = StateModel( state = StateModel(
@ -205,13 +212,12 @@ class BaseGenerator:
dependencies_map=dependencies_map, dependencies_map=dependencies_map,
total_files=len(self.design.files) if self.design else 0, total_files=len(self.design.files) if self.design else 0,
output_dir=str(self.output_dir), output_dir=str(self.output_dir),
readme_path=self.readme_content[:100] if self.readme_content else "" readme_path=self.readme_content[:100] if self.readme_content else "",
) )
with open(self.state_file, "w", encoding="utf-8") as f: with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) json.dump(state.model_dump(), f, indent=2, ensure_ascii=False)
logger.debug(f"状态已保存: {self.state_file}") logger.debug(f"状态已保存: {self.state_file}")
def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]:
""" """
从design.json获取文件列表和依赖关系 从design.json获取文件列表和依赖关系
@ -223,24 +229,26 @@ class BaseGenerator:
""" """
if not self.design: if not self.design:
raise ValueError("design.json未加载请先调用generate_design_json") raise ValueError("design.json未加载请先调用generate_design_json")
files = [file.path for file in self.design.files] files = [file.path for file in self.design.files]
dependencies = {file.path: file.dependencies for file in self.design.files} dependencies = {file.path: file.dependencies for file in self.design.files}
logger.info(f"从design.json解析到 {len(files)} 个待生成文件") logger.info(f"从design.json解析到 {len(files)} 个待生成文件")
logger.debug(f"文件列表: {files}") logger.debug(f"文件列表: {files}")
logger.debug(f"依赖关系: {dependencies}") logger.debug(f"依赖关系: {dependencies}")
return files, dependencies return files, dependencies
def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]: def _add_implicit_dependencies(
self, files: List[str], dependencies: Dict[str, List[str]]
) -> Dict[str, List[str]]:
""" """
添加隐式依赖关系基于文件路径和常见模式 添加隐式依赖关系基于文件路径和常见模式
Args: Args:
files: 文件路径列表 files: 文件路径列表
dependencies: 原始依赖字典 dependencies: 原始依赖字典
Returns: Returns:
Dict[str, List[str]]: 增强后的依赖字典 Dict[str, List[str]]: 增强后的依赖字典
""" """
@ -251,75 +259,17 @@ class BaseGenerator:
# 添加同一目录下的其他文件作为隐式依赖(简单示例) # 添加同一目录下的其他文件作为隐式依赖(简单示例)
path = Path(file) path = Path(file)
implicit_deps = [ implicit_deps = [
f for f in files f
if f != file and Path(f).parent == path.parent and f not in enhanced[file] for f in files
if f != file
and Path(f).parent == path.parent
and f not in enhanced[file]
] ]
if implicit_deps: if implicit_deps:
enhanced[file].extend(implicit_deps) enhanced[file].extend(implicit_deps)
logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}")
return enhanced return enhanced
def _apply_diff(self, diff: str, original_content: str) -> str:
"""
应用 unified diff 到原始内容返回修改后的内容
Args:
diff: 字符串形式的 unified diff
original_content: 原始文件内容
Returns:
str: 应用 diff 后的内容
Raises:
Exception: 如果应用 diff 失败
"""
try:
# 解析 diff 行
diff_lines = diff.splitlines(keepends=True)
if not diff_lines:
raise ValueError("diff 为空")
# 简单的 diff 应用逻辑:假设 diff 是标准 unified diff逐行处理
# 注意:这是一个简化实现,对于复杂 diff 可能不准确,建议使用专用库如 `patch`
original_lines = original_content.splitlines(keepends=True)
result_lines = []
i = 0
j = 0
while i < len(diff_lines):
line = diff_lines[i]
if line.startswith('--- ') or line.startswith('+++ '):
i += 1
continue
elif line.startswith('@@ '):
i += 1
continue
elif line.startswith(' '):
# 未修改行
if j < len(original_lines):
result_lines.append(original_lines[j])
j += 1
i += 1
elif line.startswith('-'):
# 删除行
j += 1
i += 1
elif line.startswith('+'):
# 新增行
result_lines.append(line[1:])
i += 1
else:
i += 1 # 跳过未知行
# 添加剩余原始行
while j < len(original_lines):
result_lines.append(original_lines[j])
j += 1
return ''.join(result_lines)
except Exception as e:
logger.error(f"应用 diff 时出错: {e}")
raise RuntimeError(f"无法应用 diff: {e}")
def generate_file( def generate_file(
self, self,
file_path: str, file_path: str,
@ -330,32 +280,34 @@ class BaseGenerator:
) -> Tuple[str, str, List[str]]: ) -> Tuple[str, str, List[str]]:
""" """
生成单个文件返回 (代码, 描述, 命令列表) 生成单个文件返回 (代码, 描述, 命令列表)
Args: Args:
file_path: 目标文件路径 file_path: 目标文件路径
prompt_instruction: 生成指令 prompt_instruction: 生成指令
dependency_files: 依赖文件列表用于上下文 dependency_files: 依赖文件列表用于上下文
existing_content: 文件现有内容若为修改模式 existing_content: 文件现有内容若为修改模式
output_format: 输出格式'full' 'diff'来自 models.py output_format: 输出格式'full'来自 models.py
""" """
# 收集上下文内容 # 收集上下文内容
context_content = [] context_content = []
if self.readme_content: if self.readme_content:
context_content.append(f"### 项目 README ###\n{self.readme_content}\n") context_content.append(f"### 项目 README ###\n{self.readme_content}\n")
# 添加 design.json 上下文 # 添加 design.json 上下文
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
if design_path.exists(): if design_path.exists():
try: try:
with open(design_path, "r", encoding="utf-8") as f: with open(design_path, "r", encoding="utf-8") as f:
design_content = f.read() design_content = f.read()
context_content.append(f"### 设计文件: design.json ###\n{design_content}\n") context_content.append(
f"### 设计文件: design.json ###\n{design_content}\n"
)
except Exception as e: except Exception as e:
logger.error(f"读取design.json失败: {e}") logger.error(f"读取design.json失败: {e}")
self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]")
# 如果design.json读取失败可能无法继续但保持上下文为空或部分 # 如果design.json读取失败可能无法继续但保持上下文为空或部分
# 添加依赖文件内容(仅读取存在的文件) # 添加依赖文件内容(仅读取存在的文件)
for dep in dependency_files: for dep in dependency_files:
dep_path = Path(dep) dep_path = Path(dep)
@ -365,116 +317,94 @@ class BaseGenerator:
dep_path = alt_path dep_path = alt_path
else: else:
logger.warning(f"依赖文件不存在,已跳过: {dep}") logger.warning(f"依赖文件不存在,已跳过: {dep}")
self.console.print(f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]") self.console.print(
f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]"
)
continue continue
try: try:
with open(dep_path, "r", encoding="utf-8") as f: with open(dep_path, "r", encoding="utf-8") as f:
content = f.read() content = f.read()
context_content.append(f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n") context_content.append(
f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n"
)
except Exception as e: except Exception as e:
logger.error(f"读取依赖文件 {dep} 失败: {e}") logger.error(f"读取依赖文件 {dep} 失败: {e}")
self.console.print(f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]") self.console.print(
f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]"
)
# 跳过此依赖文件 # 跳过此依赖文件
# 如果有现有内容,也加入上下文 # 如果有现有内容,也加入上下文
if existing_content is not None: if existing_content is not None:
context_content.append(f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n") context_content.append(
f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n"
)
full_context = "\n".join(context_content) full_context = "\n".join(context_content)
# 根据 output_format 设置 system_prompt # output_format 为 'full' 或其他,保持现有逻辑
if output_format == "diff": if existing_content is not None:
if existing_content is None:
logger.error("对于 output_format='diff',必须提供 existing_content")
self.console.print("[bold red]❌ 对于 output_format='diff',必须提供 existing_content[/bold red]")
return "# 错误:缺少现有内容", "生成失败,缺少现有内容", []
system_prompt = ( system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成文件的差异diff" "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。"
"返回严格的 JSON 对象,包含四个字段:\n" "返回严格的 JSON 对象,包含四个字段:\n"
"- diff: (string) 文件的差异,使用 unified diff 格式\n" "- code: (string) 修改后的完整代码\n"
"- description: (string) 简短的中文修改描述\n" "- description: (string) 简短的中文修改描述\n"
"- commands: (array of string) 修改此文件后需要执行的操作系统命令列表,若无则返回空数组\n" "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'diff'" "- output_format: (string) 应为 'full'"
) )
else: else:
# output_format 为 'full' 或其他,保持现有逻辑 system_prompt = (
if existing_content is not None: "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。"
system_prompt = ( "返回严格的 JSON 对象,包含四个字段:\n"
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" "- code: (string) 生成的完整代码\n"
"返回严格的 JSON 对象,包含四个字段:\n" "- description: (string) 简短的中文功能描述\n"
"- code: (string) 修改后的完整代码\n" "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- description: (string) 简短的中文修改描述\n" "- output_format: (string) 应为 'full'"
"- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" )
"- output_format: (string) 应为 'full'"
)
else:
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- code: (string) 生成的完整代码\n"
"- description: (string) 简短的中文功能描述\n"
"- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'full'"
)
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
if output_format == "diff":
user_prompt += f"\noutput_format: {output_format}"
try: try:
result = self._call_llm(system_prompt, user_prompt) result = self._call_llm(system_prompt, user_prompt)
# 解析响应,假设包含 output_format 字段 code = result.get("code")
if output_format == "diff": description = result.get("description", "")
diff = result.get("diff") commands = result.get("commands", [])
description = result.get("description", "") result.get("output_format", "full")
commands = result.get("commands", []) if code is None:
result.get("output_format", "diff") raise ValueError("LLM 响应中没有 code 字段")
if diff is None: return code, description, commands
raise ValueError("LLM 响应中没有 diff 字段")
# 调用 diff_applier 应用 diff
try:
chunks = parse_diff(diff)
code, conflicts = apply_diff(existing_content, chunks)
if conflicts:
logger.warning(f"应用diff时发现冲突: {conflicts}")
# 可以记录冲突,但继续处理
except Exception as e:
logger.error(f"应用 diff 时发生意外错误: {e}")
self.console.print(f"[bold red]❌ 应用 diff 时发生意外错误: {e}[/bold red]")
return "# 应用 diff 失败", f"应用 diff 时发生意外错误: {e}", []
return code, description, commands
else:
code = result.get("code")
description = result.get("description", "")
commands = result.get("commands", [])
result.get("output_format", "full")
if code is None:
raise ValueError("LLM 响应中没有 code 字段")
return code, description, commands
except Exception as e: except Exception as e:
logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}")
self.console.print(f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]") self.console.print(
f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]"
)
# 返回默认值以便继续 # 返回默认值以便继续
return "# 生成失败,请检查日志", "生成失败,发生错误", [] return "# 生成失败,请检查日志", "生成失败,发生错误", []
def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]: def _generate_file_task(
self, file_path: str, dependencies: List[str], generated_files: set
) -> Tuple[bool, str]:
""" """
并发任务函数用于生成单个文件 并发任务函数用于生成单个文件
Args: Args:
file_path: 文件路径 file_path: 文件路径
dependencies: 依赖文件列表 dependencies: 依赖文件列表
generated_files: 已生成文件的集合用于上下文 generated_files: 已生成文件的集合用于上下文
Returns: Returns:
Tuple[bool, str]: (是否成功, 错误信息或空字符串) Tuple[bool, str]: (是否成功, 错误信息或空字符串)
""" """
try: try:
instruction = f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。" instruction = (
f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。"
)
# 过滤依赖文件,只使用已生成的 # 过滤依赖文件,只使用已生成的
available_deps = [dep for dep in dependencies if dep in generated_files] available_deps = [dep for dep in dependencies if dep in generated_files]
code, desc, commands = self.generate_file(file_path, instruction, available_deps) code, desc, commands = self.generate_file(
file_path, instruction, available_deps
)
logger.info(f"生成完成: {file_path} - {desc}") logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件 # 写入文件
@ -495,7 +425,9 @@ class BaseGenerator:
logger.error(f"生成文件 {file_path} 失败: {e}") logger.error(f"生成文件 {file_path} 失败: {e}")
return False, str(e) return False, str(e)
def _topological_sort(self, files: List[str], dependencies: Dict[str, List[str]]) -> List[str]: def _topological_sort(
self, files: List[str], dependencies: Dict[str, List[str]]
) -> List[str]:
""" """
对文件列表进行拓扑排序基于依赖关系 对文件列表进行拓扑排序基于依赖关系
返回排序后的列表满足每个文件的依赖项都出现在该文件之前 返回排序后的列表满足每个文件的依赖项都出现在该文件之前
@ -510,9 +442,9 @@ class BaseGenerator:
# 构建图如果文件f依赖于dep则增加f的入度并将f加入rev_graph[dep] # 构建图如果文件f依赖于dep则增加f的入度并将f加入rev_graph[dep]
for f in files: for f in files:
for dep in dependencies.get(f, []): for dep in dependencies.get(f, []):
if dep in files: # 只考虑在files中的依赖 if dep in files: # 只考虑在files中的依赖
in_degree[f] += 1 # f依赖于dep所以f的入度增加 in_degree[f] += 1 # f依赖于dep所以f的入度增加
rev_graph[dep].append(f) # dep被f依赖 rev_graph[dep].append(f) # dep被f依赖
# 队列初始化为入度为0的文件无依赖的文件 # 队列初始化为入度为0的文件无依赖的文件
queue = deque([f for f in files if in_degree[f] == 0]) queue = deque([f for f in files if in_degree[f] == 0])
@ -529,7 +461,9 @@ class BaseGenerator:
# 检查是否所有文件都已排序(无循环依赖) # 检查是否所有文件都已排序(无循环依赖)
if len(sorted_files) != len(files): if len(sorted_files) != len(files):
raise ValueError(f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。") raise ValueError(
f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。"
)
return sorted_files return sorted_files
@ -543,7 +477,9 @@ class BaseGenerator:
dangerous, reason = is_dangerous_command(cmd) dangerous, reason = is_dangerous_command(cmd)
if dangerous: if dangerous:
logger.error(f"危险命令被阻止: {cmd},原因: {reason}") logger.error(f"危险命令被阻止: {cmd},原因: {reason}")
self.console.print(f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]") self.console.print(
f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]"
)
return False return False
logger.info(f"执行命令: {cmd}") logger.info(f"执行命令: {cmd}")
@ -563,7 +499,9 @@ class BaseGenerator:
logger.warning(f"stderr: {result.stderr[:500]}") logger.warning(f"stderr: {result.stderr[:500]}")
if result.returncode != 0: if result.returncode != 0:
logger.error(f"命令执行失败,返回码: {result.returncode}") logger.error(f"命令执行失败,返回码: {result.returncode}")
self.console.print(f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]") self.console.print(
f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]"
)
return False return False
return True return True
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -592,6 +530,16 @@ class BaseGenerator:
) )
# 将现有 design.json 内容作为上下文的一部分 # 将现有 design.json 内容作为上下文的一部分
if not self.design:
design_path = self.output_dir / "design.json"
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
raise e
design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False)
user_prompt = ( user_prompt = (
f"工单类型: {issue_type}\n" f"工单类型: {issue_type}\n"
@ -602,7 +550,9 @@ class BaseGenerator:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
return result return result
def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): def _update_design(
self, generated_files: List[str], design_updates: Dict[str, Any]
):
""" """
根据生成的变更更新 design.json 根据生成的变更更新 design.json
使用 FileModel 来处理文件信息 使用 FileModel 来处理文件信息
@ -616,7 +566,7 @@ class BaseGenerator:
if not exists: if not exists:
# 获取更新信息 # 获取更新信息
update_info = design_updates.get(file_path, {}) update_info = design_updates.get(file_path, {})
# 创建新文件条目FileModel实例 # 创建新文件条目FileModel实例
new_file = FileModel( new_file = FileModel(
path=file_path, path=file_path,
@ -624,7 +574,7 @@ class BaseGenerator:
dependencies=update_info.get("dependencies", []), dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []), functions=update_info.get("functions", []),
classes=update_info.get("classes", []), classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}) design_updates=update_info.get("design_updates", {}),
) )
self.design.files.append(new_file) self.design.files.append(new_file)
updated = True updated = True
@ -654,13 +604,17 @@ class BaseGenerator:
self.readme_content = self.parse_readme(readme_path) self.readme_content = self.parse_readme(readme_path)
except Exception as e: except Exception as e:
logger.error(f"读取README.md失败无法刷新design: {e}") logger.error(f"读取README.md失败无法刷新design: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败无法刷新design: {e}[/bold red]") self.console.print(
f"[bold red]❌ 读取README.md失败无法刷新design: {e}[/bold red]"
)
return False return False
else: else:
logger.error("没有README内容且README.md文件不存在无法刷新design") logger.error("没有README内容且README.md文件不存在无法刷新design")
self.console.print("[bold red]❌ 没有README内容且README.md文件不存在无法刷新design[/bold red]") self.console.print(
"[bold red]❌ 没有README内容且README.md文件不存在无法刷新design[/bold red]"
)
return False return False
try: try:
self.design = self.generate_design_json() self.design = self.generate_design_json()
logger.info("design.json已成功重新生成") logger.info("design.json已成功重新生成")
@ -682,7 +636,9 @@ class BaseGenerator:
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
if not design_path.exists(): if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}") logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False return False
try: try:
with open(design_path, "r", encoding="utf-8") as f: with open(design_path, "r", encoding="utf-8") as f:
@ -692,8 +648,8 @@ class BaseGenerator:
logger.error(f"加载design.json失败: {e}") logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False return False
# 调用LLM分析文件内容返回更新信息 # 调用LLM分析文件内容返回更新信息增强以支持design_updates字段
system_prompt = ( system_prompt = (
"你是一个软件架构师。分析给定的文件内容并返回对design.json中该文件条目的更新。" "你是一个软件架构师。分析给定的文件内容并返回对design.json中该文件条目的更新。"
"返回严格的JSON对象包含以下字段\n" "返回严格的JSON对象包含以下字段\n"
@ -701,6 +657,7 @@ class BaseGenerator:
"- dependencies: 依赖文件列表\n" "- dependencies: 依赖文件列表\n"
"- functions: 函数列表每个对象有name, summary, inputs, outputs\n" "- functions: 函数列表每个对象有name, summary, inputs, outputs\n"
"- classes: 类列表每个对象有name, summary, methods\n" "- classes: 类列表每个对象有name, summary, methods\n"
"- design_updates: 可选,设计更新字典\n"
"注意仅返回JSON不要其他文本。" "注意仅返回JSON不要其他文本。"
) )
# 准备当前design.json中该文件的条目信息 # 准备当前design.json中该文件的条目信息
@ -710,11 +667,11 @@ class BaseGenerator:
current_entry = f.model_dump() current_entry = f.model_dump()
break break
user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目如果存在:\n{json.dumps(current_entry, indent=2) if current_entry else ''}" user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目如果存在:\n{json.dumps(current_entry, indent=2) if current_entry else ''}"
try: try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
update_info = result update_info = result
# 查找或创建文件条目 # 查找或创建文件条目
file_model = None file_model = None
for f in self.design.files: for f in self.design.files:
@ -722,30 +679,40 @@ class BaseGenerator:
file_model = f file_model = f
break break
if file_model is None: if file_model is None:
# 创建新条目 # 创建新条目包括design_updates
file_model = FileModel( new_file = FileModel(
path=file_path, path=file_path,
summary=update_info.get("summary", ""), summary=update_info.get("summary", ""),
dependencies=update_info.get("dependencies", []), dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []), functions=update_info.get("functions", []),
classes=update_info.get("classes", []) classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}), # 新增design_updates处理
) )
self.design.files.append(file_model) self.design.files.append(new_file)
logger.info(f"在design.json中创建了新文件条目: {file_path}") logger.info(f"在design.json中创建了新文件条目: {file_path}")
else: else:
# 更新现有条目 # 更新现有条目使用merge_design_updates处理design_updates
if 'design_updates' in update_info:
file_model.merge_design_updates(update_info['design_updates'])
# 更新其他字段
file_model.summary = update_info.get("summary", file_model.summary) file_model.summary = update_info.get("summary", file_model.summary)
file_model.dependencies = update_info.get("dependencies", file_model.dependencies) file_model.dependencies = update_info.get(
file_model.functions = update_info.get("functions", file_model.functions) "dependencies", file_model.dependencies
)
file_model.functions = update_info.get(
"functions", file_model.functions
)
file_model.classes = update_info.get("classes", file_model.classes) file_model.classes = update_info.get("classes", file_model.classes)
logger.info(f"更新了design.json中的文件条目: {file_path}") logger.info(f"更新了design.json中的文件条目: {file_path}")
# 保存更新后的design.json # 保存更新后的design.json
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f: with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_path}") logger.info(f"design.json已更新文件条目: {file_path}")
self.console.print(f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]") self.console.print(
f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]"
)
return True return True
except Exception as e: except Exception as e:
logger.error(f"更新文件条目失败: {e}") logger.error(f"更新文件条目失败: {e}")
@ -762,7 +729,9 @@ class BaseGenerator:
readme_path = self.output_dir / "README.md" readme_path = self.output_dir / "README.md"
if not readme_path.exists(): if not readme_path.exists():
logger.error(f"README.md不存在于 {self.output_dir}") logger.error(f"README.md不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]") self.console.print(
f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]"
)
return False return False
try: try:
with open(readme_path, "r", encoding="utf-8") as f: with open(readme_path, "r", encoding="utf-8") as f:
@ -771,12 +740,14 @@ class BaseGenerator:
logger.error(f"读取README.md失败: {e}") logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
return False return False
# 加载design.json # 加载design.json
design_path = self.output_dir / "design.json" design_path = self.output_dir / "design.json"
if not design_path.exists(): if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}") logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]") self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False return False
try: try:
with open(design_path, "r", encoding="utf-8") as f: with open(design_path, "r", encoding="utf-8") as f:
@ -786,7 +757,7 @@ class BaseGenerator:
logger.error(f"加载design.json失败: {e}") logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False return False
# 调用LLM比较和同步 # 调用LLM比较和同步
system_prompt = ( system_prompt = (
"你是一个软件架构师。比较README.md内容和design.json识别不一致之处并建议更新。" "你是一个软件架构师。比较README.md内容和design.json识别不一致之处并建议更新。"
@ -797,15 +768,17 @@ class BaseGenerator:
"注意仅返回JSON不要其他文本。" "注意仅返回JSON不要其他文本。"
) )
user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}"
try: try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
needs_update = result.get("needs_update", False) needs_update = result.get("needs_update", False)
if not needs_update: if not needs_update:
logger.info("README.md和design.json已同步无需更新") logger.info("README.md和design.json已同步无需更新")
self.console.print("[green]✅ README.md和design.json已同步无需更新[/green]") self.console.print(
"[green]✅ README.md和design.json已同步无需更新[/green]"
)
return True return True
update_type = result.get("update_type", "") update_type = result.get("update_type", "")
updates = result.get("updates", {}) updates = result.get("updates", {})
if update_type == "readme": if update_type == "readme":
@ -836,10 +809,12 @@ class BaseGenerator:
self.console.print("[green]✅ README.md和design.json已同步更新[/green]") self.console.print("[green]✅ README.md和design.json已同步更新[/green]")
else: else:
logger.warning(f"未知的update_type: {update_type}") logger.warning(f"未知的update_type: {update_type}")
self.console.print(f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]") self.console.print(
f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]"
)
return False return False
return True return True
except Exception as e: except Exception as e:
logger.error(f"同步README.md失败: {e}") logger.error(f"同步README.md失败: {e}")
self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]")
return False return False

View File

@ -0,0 +1,244 @@
import json
from pathlib import Path
from typing import Optional, Dict, Any, List
import sys
from loguru import logger
from rich.console import Console
from .core import CodeGenerator
from .models import DesignModel, FileModel, FileStatus, LLMResponse
class DesignGenerator(CodeGenerator):
"""设计生成器,专门处理设计文件的生成和增量更新逻辑。"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4,
):
"""初始化设计生成器。
Args:
api_key: OpenAI API密钥默认从环境变量DEEPSEEK_APIKEY读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径默认自动生成
max_concurrency: 最大并发数
"""
super().__init__(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=output_dir,
log_file=log_file,
max_concurrency=max_concurrency,
)
self.console = Console()
logger.info("DesignGenerator 初始化完成")
def process_design_command(self, issue_content: str, issue_type: str = "design") -> bool:
"""处理设计命令,基于工单内容实现设计文件的生成和增量更新逻辑。
包括源文件分析LLM调用和合并/覆盖处理
Args:
issue_content: 工单内容字符串
issue_type: 工单类型默认为 "design"
Returns:
bool: 是否成功处理
"""
logger.info(f"开始处理设计命令,工单类型: {issue_type}")
try:
# 1. 源文件分析:分析工单以获取变更计划
analysis_result = self._analyze_issue(issue_content, issue_type)
affected_files = analysis_result.get("affected_files", [])
design_updates = analysis_result.get("design_updates", {})
logger.debug(f"分析结果 - 受影响文件: {affected_files}, 设计更新: {design_updates}")
# 2. LLM调用和文件生成/更新
generated_files: List[str] = []
for file_info in affected_files:
file_path = file_info["path"]
action = file_info.get("action", "create")
description = file_info.get("description", "")
dependencies = file_info.get("dependencies", [])
# 检查文件现有内容以支持增量更新
existing_content = None
output_path = self.output_dir / file_path
if output_path.exists() and action == "modify":
try:
with open(output_path, "r", encoding="utf-8") as f:
existing_content = f.read()
logger.info(f"文件存在,将进行修改: {file_path}")
except Exception as e:
logger.error(f"读取现有文件失败,视为创建: {e}")
existing_content = None
elif action == "create" and output_path.exists():
logger.warning(f"文件已存在,但工单要求创建,将覆盖: {file_path}")
# 可以选择保留或覆盖,这里默认覆盖以处理增量
# 构建生成指令
if action == "create":
instruction = f"根据工单分析,创建文件 '{file_path}',描述: {description}"
else: # modify
instruction = f"根据工单分析,修改文件 '{file_path}',描述: {description}"
# 调用LLM生成代码
code, desc, commands = self.generate_file(
file_path=file_path,
prompt_instruction=instruction,
dependency_files=dependencies,
existing_content=existing_content,
output_format="full",
)
logger.info(f"生成文件完成: {file_path} - {desc}")
# 写入文件,处理合并/覆盖
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
generated_files.append(file_path)
logger.info(f"已写入文件: {output_path}")
# 执行相关命令
for cmd in commands:
self.execute_command(cmd, cwd=self.output_dir)
# 3. 更新design.json处理合并/覆盖
if generated_files or design_updates:
self._update_design(generated_files, design_updates)
logger.info("design.json 已更新")
self.console.print("[green]✅ 设计文件更新完成[/green]")
logger.info("设计命令处理成功")
return True
except Exception as e:
logger.error(f"处理设计命令失败: {e}")
self.console.print(f"[bold red]❌ 处理设计命令失败: {e}[/bold red]")
return False
def analyze_source_files(self, source_dir: Path) -> Dict[str, Any]:
"""分析源代码目录以提取设计信息用于生成或刷新design.json。
Args:
source_dir: 源代码目录路径
Returns:
Dict[str, Any]: 包含提取的设计信息如文件列表依赖等
"""
logger.info(f"开始分析源代码目录: {source_dir}")
if not source_dir.exists():
logger.error(f"源代码目录不存在: {source_dir}")
raise FileNotFoundError(f"目录不存在: {source_dir}")
# 收集所有Python文件
python_files = list(source_dir.rglob("*.py"))
design_info = {"files": [], "dependencies": {}}
# 简单分析遍历文件并调用LLM提取信息可优化为并发
for file_path in python_files:
rel_path = str(file_path.relative_to(source_dir))
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# 调用LLM分析单个文件
system_prompt = (
"你是一个软件架构师。分析给定的Python文件内容返回设计信息包括摘要、依赖、函数和类。"
"返回严格的JSON对象包含summary、dependencies、functions、classes字段。"
)
user_prompt = f"文件路径: {rel_path}\n文件内容:\n{content}"
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
# 构建文件模型
file_model = {
"path": rel_path,
"summary": result.get("summary", "自动分析生成"),
"dependencies": result.get("dependencies", []),
"functions": result.get("functions", []),
"classes": result.get("classes", []),
"design_updates": {},
}
design_info["files"].append(file_model)
design_info["dependencies"][rel_path] = file_model["dependencies"]
logger.debug(f"已分析文件: {rel_path}")
except Exception as e:
logger.error(f"分析文件 {rel_path} 失败: {e}")
# 跳过失败的文件
logger.info(f"源代码分析完成,共分析 {len(design_info['files'])} 个文件")
return design_info
def refresh_design_from_source(self, source_dir: Path) -> bool:
"""从源代码目录刷新design.json基于分析结果。
Args:
source_dir: 源代码目录路径
Returns:
bool: 是否成功刷新
"""
logger.info("开始从源代码刷新design.json")
try:
# 分析源代码
design_info = self.analyze_source_files(source_dir)
# 构建DesignModel
design = DesignModel(
project_name=self.design.project_name if self.design else "llm-codegen",
version=self.design.version if self.design else "1.0.0",
description=self.design.description if self.design else "基于大语言模型的代码生成工具",
files=[FileModel(**file) for file in design_info["files"]],
commands=self.design.commands if self.design else [],
check_tools=self.design.check_tools if self.design else [],
)
# 保存design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
self.design = design
logger.info(f"design.json 已刷新并保存至: {design_path}")
self.console.print("[green]✅ design.json 已从源代码刷新[/green]")
return True
except Exception as e:
logger.error(f"从源代码刷新design.json失败: {e}")
self.console.print(f"[bold red]❌ 从源代码刷新design.json失败: {e}[/bold red]")
return False
def run(self, readme_path: Optional[Path] = None, issue_content: Optional[str] = None) -> None:
"""主执行流程,用于集成到命令行接口。
Args:
readme_path: README文件路径可选
issue_content: 工单内容字符串可选
"""
if readme_path:
self.readme_content = self.parse_readme(readme_path)
self.design = self.generate_design_json()
logger.info("已从README生成design.json")
if issue_content:
success = self.process_design_command(issue_content)
if success:
logger.info("设计命令处理完成")
else:
logger.error("设计命令处理失败")
sys.exit(1)
else:
logger.warning("未提供工单内容,仅生成或刷新设计文件")
if __name__ == "__main__":
# 示例用法
generator = DesignGenerator()
generator.run()

View File

@ -4,10 +4,10 @@ from typing import Any, Dict, List, Optional
from loguru import logger from loguru import logger
from rich.console import Console from rich.console import Console
from .core import BaseGenerator from .core import CodeGenerator
class EnhanceGenerator(BaseGenerator): class EnhanceGenerator(CodeGenerator):
""" """
增强生成器类继承自 BaseGenerator专门处理 enhance 命令逻辑 增强生成器类继承自 BaseGenerator专门处理 enhance 命令逻辑
用于根据需求工单feature.issue对现有项目进行功能增强 用于根据需求工单feature.issue对现有项目进行功能增强
@ -73,6 +73,7 @@ class EnhanceGenerator(BaseGenerator):
except Exception as e: except Exception as e:
logger.error(f"分析工单失败: {e}") logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
raise e
return False return False
affected_files = analysis_result.get("affected_files", []) affected_files = analysis_result.get("affected_files", [])
@ -93,7 +94,7 @@ class EnhanceGenerator(BaseGenerator):
sorted_paths = self._topological_sort(file_paths, dependencies) sorted_paths = self._topological_sort(file_paths, dependencies)
logger.debug(f"拓扑排序结果: {sorted_paths}") logger.debug(f"拓扑排序结果: {sorted_paths}")
except ValueError as e: except ValueError as e:
logger.error(f"拓扑排序失败,检测到循环依赖: {e}") logger.error(f"拓扑排序失败,检测到循环依赖: {e}, dependencies: {dependencies}")
self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]") self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]")
return False return False
@ -145,6 +146,18 @@ class EnhanceGenerator(BaseGenerator):
existing_content=existing_content, existing_content=existing_content,
output_format=output_format output_format=output_format
) )
# 将代码写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入文件: {output_path}")
self.console.print(f"[green]✅ 已写入文件: {file_path}[/green]")
except Exception as e:
logger.error(f"写入文件 {file_path} 失败: {e}")
self.console.print(f"[bold red]❌ 写入文件 {file_path} 失败: {e}[/bold red]")
continue
# generate_file 内部已写入文件并执行命令 # generate_file 内部已写入文件并执行命令
generated_files.append(file_path) generated_files.append(file_path)
logger.info(f"文件处理完成: {file_path} - {desc}") logger.info(f"文件处理完成: {file_path} - {desc}")

View File

@ -1,11 +1,11 @@
import json import json
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
from .core import BaseGenerator from .core import CodeGenerator
from .models import OutputFormat from .models import OutputFormat
class FixGenerator(BaseGenerator): class FixGenerator(CodeGenerator):
"""处理 Bug 修复逻辑的生成器类,继承自 BaseGenerator。""" """处理 Bug 修复逻辑的生成器类,继承自 BaseGenerator。"""
def __init__(self, **kwargs): def __init__(self, **kwargs):

View File

@ -42,6 +42,14 @@ class FileModel(BaseModel):
classes: List[ClassModel] = Field(default_factory=list) classes: List[ClassModel] = Field(default_factory=list)
design_updates: Dict[str, Any] = Field(default_factory=dict) design_updates: Dict[str, Any] = Field(default_factory=dict)
def merge_design_updates(self, updates: Dict[str, Any]) -> None:
"""合并设计更新到当前文件模型。
参数:
updates: 一个字典包含要合并的设计更新
"""
self.design_updates.update(updates)
class DesignModel(BaseModel): class DesignModel(BaseModel):
"""设计模型,对应 design.json 的根结构。""" """设计模型,对应 design.json 的根结构。"""
@ -90,4 +98,4 @@ class LLMResponse(BaseModel):
code: str code: str
description: str description: str
commands: List[str] = Field(default_factory=list) commands: List[str] = Field(default_factory=list)
output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full''diff',默认为 'full'") output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full''diff',默认为 'full'")