chore: 更新 .gitignore 添加更多环境与配置文件忽略项

This commit is contained in:
songsenand 2026-03-21 12:14:58 +08:00
parent 0a88ff9d6e
commit 70f795de87
16 changed files with 2903 additions and 1359 deletions

27
.gitignore vendored
View File

@ -17,3 +17,30 @@ test_output/
*/*.egg-info */*.egg-info
/logs/* /logs/*
/llm_responses/* /llm_responses/*
/.vscode/*
/.idea/*
/.vs/*
/.vscode-server/*
/.env
/.env.local
/.env.development.local
/.env.test.local
/.env.production.local
/.env.development
/.env.test
/.env.production
/.env.example
/.env.template
/.env.default
/.env.defaults
/.env.defaults.local
/.env.defaults.development
/.env.defaults.test
/.env.defaults.production
/.env.defaults.development.local
/.env.defaults.test.local
/.env.defaults.production.local
/.env.defaults.development.example
/.env.defaults.test.example
/.env.defaults.production.example
/issues/*

View File

@ -1,4 +1,4 @@
# LLM 代码生成工具(自举版 · 增强版) # LLM 代码生成工具
本项目是一个基于大语言模型的智能代码生成与维护工具。它不仅能够根据项目 `README.md` 描述**自动生成完整的 Python 包代码**,还支持**在现有项目上增量添加功能**和**自动修复 Bug**。工具采用 `uv` 管理依赖,包含单元测试、并行检查、断点续写等特性,并通过一个**面向 LLM 的中间设计层**来提升生成质量和可维护性。 本项目是一个基于大语言模型的智能代码生成与维护工具。它不仅能够根据项目 `README.md` 描述**自动生成完整的 Python 包代码**,还支持**在现有项目上增量添加功能**和**自动修复 Bug**。工具采用 `uv` 管理依赖,包含单元测试、并行检查、断点续写等特性,并通过一个**面向 LLM 的中间设计层**来提升生成质量和可维护性。
@ -146,7 +146,7 @@ llm-codegen design project_readme.md -o ./my_design
```json ```json
{ {
"project_name": "MyProject", "project_name": "MyProject",
"version": "1.0.0", "version": "X.X.X",
"description": "项目简短描述", "description": "项目简短描述",
"files": [ "files": [
{ {
@ -174,6 +174,31 @@ llm-codegen design project_readme.md -o ./my_design
该文件由 LLM 在 `init` 阶段生成,并在后续所有操作中作为上下文提供给 LLM确保每次生成都符合整体设计。 该文件由 LLM 在 `init` 阶段生成,并在后续所有操作中作为上下文提供给 LLM确保每次生成都符合整体设计。
## 📄 README 作为设计资产
README.md 不仅是项目文档,更是工具的关键设计资产。它作为项目的起点,用于生成中间设计文件 `design.json`,并在此后的所有操作中提供上下文。
### 重要性
- **设计输入**: README.md 描述了项目的整体目标和功能,是 LLM 生成代码和设计的基础。它确保了生成的代码与项目意图保持一致。
- **一致性保障**: 通过保持 README.md 和 `design.json` 同步,工具可以基于准确的设计信息进行操作,避免生成偏差,提升维护效率。
### 同步机制
工具提供了自动同步机制来维护 README.md 和 `design.json` 的一致性:
- **`sync_readme` 方法**: 在 `CodeGenerator` 类中(位于 `src/llm_codegen/core.py``sync_readme` 方法可以比较 README.md 的内容和 `design.json` 中的描述,自动更新 `design.json` 以反映 README.md 的变更,或报告差异以供审查。
- **集成到工作流**: 同步机制可以集成到 `enhance``fix` 操作中,在生成代码前后自动运行,确保设计信息始终最新。
### 如何保持与 design.json 一致
为了确保 README.md 和 `design.json` 保持同步,建议:
1. **定期同步**: 在修改 README.md 后,运行 `llm-codegen` 工具的相关命令(如 `enhance` 或通过自定义脚本调用 `sync_readme`)来更新 `design.json`
2. **避免手动修改**: 尽量不要手动编辑 `design.json`,除非完全理解设计意图。工具生成的 `design.json` 应作为权威来源,手动修改可能导致后续操作错误。
3. **检查不一致**: 工具在运行 `enhance``fix` 时,可以自动检查 README.md 和 `design.json` 之间的不一致,并提示用户进行同步,防止设计漂移。
通过维护这种同步,项目可以持续演进,而设计资产始终保持最新和准确,确保 LLM 生成的代码始终符合项目设计。
## 🔄 核心工作流 ## 🔄 核心工作流

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -1,360 +0,0 @@
#!/home/songsenand/env/.venv/bin/python
#!
"""
基于LLM的自动化代码生成工具
根据README.md文件自动生成项目文件结构并填充代码执行必要命令
"""
import json
import os
import subprocess
import sys
from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path
import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID
from loguru import logger
from openai import OpenAI
# ==================== 配置 ====================
DANGEROUS_COMMANDS = ["rm", "sudo", "chmod", "dd", "mkfs", "> /dev/sda", "format"]
ALLOWED_COMMANDS = [] # 可设置白名单,为空则只检查黑名单
app = typer.Typer(help="基于LLM的自动化代码生成工具")
console = Console()
# ==================== 工具函数 ====================
def is_dangerous_command(cmd: str) -> Tuple[bool, str]:
"""
判断命令是否危险
返回 (是否危险, 原因)
"""
cmd_lower = cmd.lower()
for danger in DANGEROUS_COMMANDS:
if danger in cmd_lower:
return True, f"包含危险关键词 '{danger}'"
return False, ""
# ==================== 核心类 ====================
class CodeGenerator:
"""代码生成器,封装所有逻辑"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
):
"""
初始化生成器
Args:
api_key: OpenAI API密钥默认从环境变量DEEPSEEK_APIKEY读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径默认自动生成
"""
self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY")
if not self.api_key:
raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY")
self.client = OpenAI(api_key=self.api_key, base_url=base_url)
self.model = model
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
# 配置日志
if log_file is None:
log_file = self.output_dir / "generator.log"
logger.remove() # 移除默认handler
logger.add(sys.stderr, level="WARNING") # 控制台输出INFO及以上
logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG
logger.info(f"日志已初始化,保存至: {log_file}")
self.readme_content = None
self.progress: Optional[Progress] = None
self.tasks: Dict[str, TaskID] = {} # 任务ID映射
def _call_llm(
self,
system_prompt: str,
user_prompt: str,
temperature: float = 0.2,
expect_json: bool = True,
) -> Dict[str, Any]:
"""
调用LLM并返回解析后的JSON
"""
logger.debug(f"调用LLM模型: {self.model}")
logger.debug(f"System: {system_prompt[:200]}...")
logger.debug(f"User: {user_prompt[:200]}...")
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=temperature,
response_format={"type": "json_object"} if expect_json else None,
)
message = response.choices[0].message
content = message.content
# 记录思考过程(如果存在)
if hasattr(message, "reasoning_content") and message.reasoning_content:
logger.info(f"模型思考过程: {message.reasoning_content}")
logger.debug(f"LLM原始响应: {content[:500]}...")
if expect_json:
result = json.loads(content)
else:
result = {"content": content}
return result
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}")
except Exception as e:
logger.error(f"LLM调用失败: {e}")
raise
def parse_readme(self, readme_path: Path) -> str:
"""
读取README文件内容
"""
logger.info(f"读取README文件: {readme_path}")
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"README内容长度: {len(content)} 字符")
if (readme_path.parent / 'design.json').exists():
with open((readme_path.parent / 'design.json')) as f:
content += f'\n\ndesign.json(包含项目设计有关信息)内容如下:{f.read()}'
return content
except Exception as e:
logger.error(f"读取README失败: {e}")
raise
def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]:
"""
根据README内容让LLM生成文件列表和依赖关系
Returns:
(files, dependencies)
files: 按顺序需要生成的文件路径列表
dependencies: 字典 {file: [依赖文件路径]}
"""
system_prompt = (
"你是一个软件架构师。请根据README描述分析需要生成哪些源代码文件并确定它们的生成顺序"
"同时给出每个文件生成时最少需要读取哪些已有文件作为上下文。"
"返回严格的JSON对象包含两个字段\n"
"- files: 数组,按生成顺序排列的文件路径(相对于项目根目录)\n"
"- dependencies: 对象,键为文件路径,值为该文件依赖的已有文件路径列表(可为空)\n"
"注意:依赖文件必须是已存在的参考文件,不要包含待生成的文件。"
)
user_prompt = f"README内容如下\n\n{self.readme_content}"
result = self._call_llm(system_prompt, user_prompt)
files = result.get("files", [])
dependencies = result.get("dependencies", {})
if not files:
raise ValueError("LLM未返回任何文件列表")
logger.info(f"解析到 {len(files)} 个待生成文件")
logger.debug(f"文件列表: {files}")
logger.debug(f"依赖关系: {dependencies}")
return files, dependencies
def generate_file(
self,
file_path: str,
prompt_instruction: str,
dependency_files: List[str],
) -> Tuple[str, str, List[str]]:
"""
生成单个文件返回 (代码, 描述, 命令列表)
"""
# 读取依赖文件内容
context_content = []
if self.readme_content:
context_content.append(f"### 项目 README ###\n{self.readme_content}\n")
for dep in dependency_files:
dep_path = Path(dep)
if not dep_path.exists():
# 尝试相对于当前目录或输出目录查找
alt_path = self.output_dir / dep
if alt_path.exists():
dep_path = alt_path
else:
logger.warning(FileNotFoundError(f"依赖文件不存在: {dep}"))
with open(dep_path, "r", encoding="utf-8") as f:
content = f.read()
context_content.append(f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n")
full_context = "\n".join(context_content)
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。"
"返回严格的JSON对象包含三个字段\n"
"- code: (string) 生成的完整代码\n"
"- description: (string) 简短的中文功能描述\n"
"- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组"
)
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
result = self._call_llm(system_prompt, user_prompt)
code = result.get("code", "")
description = result.get("description", "")
commands = result.get("commands", [])
if not isinstance(commands, list):
commands = []
return code, description, commands
def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> None:
"""
执行单个命令检查风险
"""
dangerous, reason = is_dangerous_command(cmd)
if dangerous:
logger.error(f"危险命令被阻止: {cmd},原因: {reason}")
return
logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd or self.output_dir,
capture_output=True,
text=True,
timeout=300, # 5分钟超时
)
logger.debug(f"命令返回码: {result.returncode}")
if result.stdout:
logger.debug(f"stdout: {result.stdout[:500]}")
if result.stderr:
logger.warning(f"stderr: {result.stderr[:500]}")
except subprocess.TimeoutExpired:
logger.error(f"命令执行超时: {cmd}")
except Exception as e:
logger.error(f"命令执行失败: {e}")
def run(self, readme_path: Path):
"""
主执行流程
"""
logger.info("=" * 50)
logger.info("开始代码生成流程")
logger.info(f"README: {readme_path}")
logger.info(f"输出目录: {self.output_dir}")
# 初始化阶段用rich输出状态不会被日志级别过滤
console.print("[bold yellow]🔍 正在解析README...[/bold yellow]")
self.readme_content = self.parse_readme(readme_path)
console.print("[bold yellow]📋 正在分析项目结构...[/bold yellow]")
files, dependencies = self.get_project_structure()
console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]")
# 3. 创建进度条
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=console,
) as progress:
self.progress = progress
# 创建总任务
total_task = progress.add_task("[cyan]整体进度...", total=len(files))
# 依次生成每个文件
for idx, file in enumerate(files, 1):
logger.info(f"处理文件 [{idx}/{len(files)}]: {file}")
# 创建子任务(可选)
file_task = progress.add_task(f"生成 {file}", total=None)
try:
# 获取依赖文件
deps = dependencies.get(file, [])
# 构造生成指令
instruction = f"请根据README描述和依赖文件生成文件 '{file}' 的完整代码。"
# 调用LLM生成代码
code, desc, commands = self.generate_file(file, instruction, deps)
logger.info(f"生成完成: {file} - {desc}")
# 写入文件
output_path = self.output_dir / file
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {output_path}")
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
self.execute_command(cmd, cwd=self.output_dir)
except Exception as e:
logger.error(f"处理文件 {file} 失败: {e}")
# 可选:继续或终止
finally:
progress.remove_task(file_task)
progress.update(total_task, advance=1)
logger.success("所有文件处理完成!")
# ==================== CLI入口 ====================
@app.command()
def main(
readme: Path = typer.Argument(..., exists=True, file_okay=True, dir_okay=False, help="README.md文件路径"),
output_dir: Optional[Path] = typer.Option(None, "--output", "-o", help="输出根目录默认为readme所在目录"),
api_key: Optional[str] = typer.Option(None, "--api-key", envvar="DEEPSEEK_APIKEY", help="API密钥也可通过环境变量DEEPSEEK_APIKEY设置"),
base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径默认输出目录下generator.log"),
):
"""
根据README自动生成项目代码
"""
if output_dir is None:
output_dir = readme.parent
generator = CodeGenerator(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=output_dir,
log_file=log_file,
)
generator.run(readme)
if __name__ == "__main__":
app()

View File

@ -43,7 +43,7 @@ default=true
llm-codegen = "llm_codegen.cli:app" llm-codegen = "llm_codegen.cli:app"
[tool.llm-codegen] [tool.llm-codegen]
check_tools = ["pytest", "pylint", "mypy", "black"] check_tools = ["black"]
max_retries = 3 max_retries = 3
dangerous_commands = ["rm", "sudo", "chmod", "dd"] dangerous_commands = ["rm", "sudo", "chmod", "dd"]
max_concurrent_requests = 5 max_concurrent_requests = 5

View File

@ -5,15 +5,15 @@ LLM 代码生成工具的命令行接口
""" """
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional, List
import sys import sys
import traceback
import typer import typer
from rich.console import Console from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
from loguru import logger from loguru import logger
from .core import BaseGenerator
from .init_generator import InitGenerator from .init_generator import InitGenerator
from .enhance_generator import EnhanceGenerator from .enhance_generator import EnhanceGenerator
from .fix_generator import FixGenerator from .fix_generator import FixGenerator
@ -63,7 +63,7 @@ def init(
output_dir = Path.cwd() output_dir = Path.cwd()
# 初始化日志配置 # 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="init") init_logging(output_dir, log_file, command_name="init")
# 处理致命错误检查README文件存在性已由typer处理其他错误在try块中捕获 # 处理致命错误检查README文件存在性已由typer处理其他错误在try块中捕获
try: try:
@ -79,7 +79,6 @@ def init(
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
generator.run(readme) generator.run(readme)
@ -120,7 +119,7 @@ def enhance(
output_dir = Path.cwd() output_dir = Path.cwd()
# 初始化日志配置 # 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="enhance") init_logging(output_dir, log_file, command_name="enhance")
# 处理致命错误检查design.json是否存在 # 处理致命错误检查design.json是否存在
design_path = output_dir / "design.json" design_path = output_dir / "design.json"
@ -142,12 +141,20 @@ def enhance(
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
success = generator.process_enhance(issue_file, output_format="full") success, affected_files = generator.process_enhance(issue_file, output_format="full")
if success: if success:
progress.update(task_id, completed=1, description="增强处理完成") # 修改:成功时更新完成状态 progress.update(task_id, completed=1, description="增强处理完成") # 修改:成功时更新完成状态
generator = DesignGenerator(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=str(output_dir),
max_concurrency=max_concurrency,
)
for file_path in affected_files:
generator.update_single_file_design(file_path)
else: else:
progress.update(task_id, description="增强处理失败") # 修改:失败时更新描述 progress.update(task_id, description="增强处理失败") # 修改:失败时更新描述
if not success: if not success:
@ -174,7 +181,7 @@ def fix(
base_url: str = typer.Option( base_url: str = typer.Option(
"https://api.deepseek.com", "--base-url", help="API基础URL" "https://api.deepseek.com", "--base-url", help="API基础URL"
), ),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-chat", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_concurrency: int = typer.Option( max_concurrency: int = typer.Option(
4, "--max-concurrency", help="并发生成的最大工作线程数默认4" 4, "--max-concurrency", help="并发生成的最大工作线程数默认4"
@ -185,7 +192,7 @@ def fix(
output_dir = Path.cwd() output_dir = Path.cwd()
# 初始化日志配置 # 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="fix") init_logging(output_dir, log_file, command_name="fix")
# 处理致命错误检查design.json是否存在 # 处理致命错误检查design.json是否存在
design_path = output_dir / "design.json" design_path = output_dir / "design.json"
@ -206,12 +213,20 @@ def fix(
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
success = generator.process_fix(issue_file, output_format="full") (success, affected_files) = generator.process_fix(issue_file, output_format="full")
if success: if success:
progress.update(task_id, completed=1, description="修复处理完成") # 修改:成功时更新完成状态 progress.update(task_id, completed=1, description="修复处理完成") # 修改:成功时更新完成状态
generator = DesignGenerator(
api_key=api_key,
base_url=base_url,
model=model,
output_dir=str(output_dir),
max_concurrency=max_concurrency,
)
for file_path in affected_files:
generator.update_single_file_design(file_path)
else: else:
progress.update(task_id, description="修复处理失败") # 修改:失败时更新描述 progress.update(task_id, description="修复处理失败") # 修改:失败时更新描述
if not success: if not success:
@ -242,6 +257,14 @@ def design(
file_okay=False, file_okay=False,
dir_okay=True, dir_okay=True,
), ),
single_files: Optional[List[Path]] = typer.Option(
None,
"--single-file",
help="指定单个或多个文件路径来更新design.json中的条目可以是相对或绝对路径支持多次使用以指定多个文件保持向后兼容全量生成行为",
exists=True,
file_okay=True,
dir_okay=False,
),
output_dir: Optional[Path] = typer.Option( output_dir: Optional[Path] = typer.Option(
None, "--output", "-o", help="输出目录design.json将保存在此默认为当前目录" None, "--output", "-o", help="输出目录design.json将保存在此默认为当前目录"
), ),
@ -253,22 +276,22 @@ def design(
base_url: str = typer.Option( base_url: str = typer.Option(
"https://api.deepseek.com", "--base-url", help="API基础URL" "https://api.deepseek.com", "--base-url", help="API基础URL"
), ),
model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), model: str = typer.Option("deepseek-chat", "--model", "-m", help="使用的模型"),
log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"),
max_concurrency: int = typer.Option( max_concurrency: int = typer.Option(
4, "--max-concurrency", help="并发生成的最大工作线程数默认4" 4, "--max-concurrency", help="并发生成的最大工作线程数默认4"
), ),
): ):
"""生成或更新design.json支持从README生成、从源代码刷新并集成新的设计生成逻辑""" """生成或更新design.json支持从README生成、从源代码刷新并集成新的设计生成逻辑,新增单个或多个文件更新功能"""
if output_dir is None: if output_dir is None:
output_dir = Path.cwd() output_dir = Path.cwd()
# 初始化日志配置 # 初始化日志配置
log_file_path = init_logging(output_dir, log_file, command_name="design") init_logging(output_dir, log_file, command_name="design")
# 检查是否提供了至少一个操作参数 # 检查是否提供了至少一个操作参数
if file is None and source is None: if file is None and source is None and not single_files:
logger.error("必须提供 --file 或 --source 参数之一来执行操作。") logger.error("必须提供 --file、--source 或 --single-file 参数之一来执行操作。")
raise typer.Exit(code=1) raise typer.Exit(code=1)
# 初始化DesignGenerator以集成新的设计生成逻辑 # 初始化DesignGenerator以集成新的设计生成逻辑
@ -285,7 +308,6 @@ def design(
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=str(output_dir), output_dir=str(output_dir),
log_file=log_file_path,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
@ -294,6 +316,9 @@ def design(
# 处理--dry-run选项 # 处理--dry-run选项
if dry_run: if dry_run:
console.print("[yellow]模拟运行模式:不会实际写入文件或执行命令。[/yellow]") console.print("[yellow]模拟运行模式:不会实际写入文件或执行命令。[/yellow]")
if single_files:
for f in single_files:
console.print(f"[blue]模拟:将更新文件 {f} 在design.json中的条目[/blue]")
if file is not None: if file is not None:
logger.info(f"模拟将从README文件 {file} 生成design.json") logger.info(f"模拟将从README文件 {file} 生成design.json")
# 在dry-run模式下仅模拟解析README # 在dry-run模式下仅模拟解析README
@ -309,6 +334,19 @@ def design(
return return
# 实际运行逻辑 # 实际运行逻辑
if single_files:
# 处理单个或多个文件更新
logger.info(f"开始处理文件更新,共 {len(single_files)} 个文件")
for f in single_files:
success = generator.update_single_file_design(f)
if not success:
logger.error(f"更新文件 {f} 失败")
raise typer.Exit(code=1)
console.print("[green]✅ 所有指定文件已更新design.json[/green]")
progress.update(task_id, completed=1, description="文件更新完成")
return # 处理完文件更新后退出,不执行其他全量生成逻辑
# 全量生成行为(保持向后兼容)
if file is not None: if file is not None:
# 生成design.json从README # 生成design.json从README
if not force and design_path.exists(): if not force and design_path.exists():
@ -330,7 +368,8 @@ def design(
progress.update(task_id, completed=1, description="design命令处理完成") progress.update(task_id, completed=1, description="design命令处理完成")
console.print(f"[green]✅ design.json 已处理完成,路径: {design_path}[/green]") console.print(f"[green]✅ design.json 已处理完成,路径: {design_path}[/green]")
except Exception as e: except Exception as e:
logger.error(f"处理design命令失败: {e}") error_message = traceback.format_exc()
logger.error(f"处理design命令失败: {e}, 堆栈跟踪: {error_message}")
raise typer.Exit(code=1) raise typer.Exit(code=1)

View File

@ -1,26 +1,23 @@
import json import json
import os import os
import subprocess import subprocess
import sys
import concurrent.futures
import pendulum
from typing import List, Dict, Optional, Any, Tuple from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path from pathlib import Path
from collections import deque
import threading import threading
import traceback
from rich.console import Console from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID from rich.progress import Progress, TaskID
from loguru import logger from loguru import logger
from openai import OpenAI
from .llm_client import LLMClient
from .utils import is_dangerous_command from .utils import is_dangerous_command
from .models import ( from .models import (
DesignModel, DesignModel,
StateModel, StateModel,
FileModel, FileModel,
FileStatus, )
) # 添加 FileStatus 导入 from .design_manager import DesignManager # 添加导入
class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
@ -31,8 +28,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
api_key: Optional[str] = None, api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com", base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner", model: str = "deepseek-reasoner",
output_dir: str = "./generated", output_dir: Path = Path("./generated"),
log_file: Optional[str] = None,
max_concurrency: int = 4, max_concurrency: int = 4,
): ):
""" """
@ -49,9 +45,10 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
if not self.api_key: if not self.api_key:
raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY") raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY")
self.client = OpenAI(api_key=self.api_key, base_url=base_url) self.client = LLMClient(self.api_key, base_url, model, output_dir)
self.model = model if isinstance(output_dir, str):
self.output_dir = Path(output_dir) output_dir = Path(output_dir)
self.output_dir = output_dir
self.output_dir.mkdir(parents=True, exist_ok=True) self.output_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.output_dir / ".llm_generator_state.json" self.state_file = self.output_dir / ".llm_generator_state.json"
self.console = Console() # 添加console实例用于rich打印 self.console = Console() # 添加console实例用于rich打印
@ -59,14 +56,6 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
self.max_concurrency = max_concurrency self.max_concurrency = max_concurrency
# 配置日志
if log_file is None:
log_file = self.output_dir / "generator.log"
logger.remove() # 移除默认handler
logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上
logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG
logger.info(f"日志已初始化,保存至: {log_file}")
self.readme_content = None self.readme_content = None
self.design: Optional[DesignModel] = None self.design: Optional[DesignModel] = None
self.state: Optional[StateModel] = None self.state: Optional[StateModel] = None
@ -79,72 +68,15 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
user_prompt: str, user_prompt: str,
temperature: float = 0.2, temperature: float = 0.2,
expect_json: bool = True, expect_json: bool = True,
) -> Dict[str, Any]: ):
""" messages = [
调用LLM并返回解析后的JSON {"role": "system", "content": system_prompt},
""" {"role": "user", "content": user_prompt},
logger.debug(f"调用LLM模型: {self.model}") ]
result = self.client.call(
try: messages=messages, temperature=temperature, expect_json=expect_json
response = self.client.chat.completions.create( )
model=self.model, return result
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=temperature,
response_format={"type": "json_object"} if expect_json else None,
)
message = response.choices[0].message
content = message.content
# 记录思考过程(如果存在)
reasoning_content = None
if hasattr(message, "reasoning_content") and message.reasoning_content:
reasoning_content = message.reasoning_content
logger.info("模型思考过程已记录")
# 创建响应目录
responses_dir = self.output_dir / "llm_responses"
responses_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名(使用当前时间)
timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS")
response_file = responses_dir / f"response_{timestamp}.json"
# 保存响应到JSON文件
response_data = {
"timestamp": timestamp,
"model": self.model,
"content": content,
"reasoning_content": reasoning_content,
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"temperature": temperature,
"expect_json": expect_json,
}
with open(response_file, "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM原始响应: {response_file.name}")
if expect_json:
result = json.loads(content)
else:
result = {"content": content}
return result
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]")
raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}")
except Exception as e:
logger.error(f"LLM调用失败: {e}")
self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]")
raise
def parse_readme(self, readme_path: Path) -> str: def parse_readme(self, readme_path: Path) -> str:
""" """
@ -161,28 +93,6 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]")
raise raise
def generate_design_json(self) -> DesignModel:
"""
调用LLM生成design.json内容并解析为DesignModel
"""
system_prompt = (
"你是一个软件架构师。请根据README描述生成项目的中间设计文件design.json。"
"design.json应包含项目名称、版本、描述、文件列表含路径、摘要、依赖、函数和类、建议命令和检查工具。"
"返回严格的 JSON 对象符合DesignModel结构。"
)
user_prompt = f"README内容如下\n\n{self.readme_content}"
result = self._call_llm(system_prompt, user_prompt)
design_data = result
design = DesignModel(**design_data)
# 写入design.json文件
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"已生成design.json: {design_path}")
return design
def load_state(self) -> Optional[StateModel]: def load_state(self) -> Optional[StateModel]:
"""加载断点续写状态""" """加载断点续写状态"""
@ -364,9 +274,15 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
) )
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
try: try:
result = self._call_llm(system_prompt, user_prompt) result = self.client.call(
messages=messages, temperature=0.2, expect_json=True
)
code = result.get("code") code = result.get("code")
description = result.get("description", "") description = result.get("description", "")
commands = result.get("commands", []) commands = result.get("commands", [])
@ -546,275 +462,54 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
f"工单内容:\n{issue_content}\n\n" f"工单内容:\n{issue_content}\n\n"
f"现有设计文件 (design.json):\n{design_str}" f"现有设计文件 (design.json):\n{design_str}"
) )
messages = [
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) {"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
result = self.client.call(messages, expect_json=True)
return result return result
def _update_design(
self, generated_files: List[str], design_updates: Dict[str, Any]
):
"""
根据生成的变更更新 design.json
使用 FileModel 来处理文件信息
"""
updated = False
# 处理新增文件
for file_path in generated_files:
# 检查文件是否已在 design.files 中
exists = any(f.path == file_path for f in self.design.files)
if not exists:
# 获取更新信息
update_info = design_updates.get(file_path, {})
# 创建新文件条目FileModel实例
new_file = FileModel(
path=file_path,
summary=update_info.get("summary", "自动生成的新文件"),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}),
)
self.design.files.append(new_file)
updated = True
logger.info(f"已将新文件 {file_path} 添加到 design.json")
# 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要)
# 这里可根据实际需求扩展,当前仅处理新增文件
if updated:
# 保存更新后的 design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info("design.json 已更新")
def refresh_design(self) -> bool:
"""
重新生成design.json基于当前README内容或加载的design.json
返回bool表示是否成功
"""
logger.info("开始刷新design.json")
if not self.readme_content:
# 尝试读取README.md文件
readme_path = self.output_dir / "README.md"
if readme_path.exists():
try:
self.readme_content = self.parse_readme(readme_path)
except Exception as e:
logger.error(f"读取README.md失败无法刷新design: {e}")
self.console.print(
f"[bold red]❌ 读取README.md失败无法刷新design: {e}[/bold red]"
)
return False
else:
logger.error("没有README内容且README.md文件不存在无法刷新design")
self.console.print(
"[bold red]❌ 没有README内容且README.md文件不存在无法刷新design[/bold red]"
)
return False
try:
self.design = self.generate_design_json()
logger.info("design.json已成功重新生成")
self.console.print("[green]✅ design.json已重新生成[/green]")
return True
except Exception as e:
logger.error(f"重新生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]")
return False
def update_file_entry(self, file_path: str, file_content: str) -> bool:
"""
更新design.json中单个文件的条目基于提供的文件内容
返回bool表示是否成功
"""
logger.info(f"开始更新design.json中文件条目: {file_path}")
if not self.design:
# 加载现有design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 调用LLM分析文件内容返回更新信息增强以支持design_updates字段
system_prompt = (
"你是一个软件架构师。分析给定的文件内容并返回对design.json中该文件条目的更新。"
"返回严格的JSON对象包含以下字段\n"
"- summary: 文件的新摘要\n"
"- dependencies: 依赖文件列表\n"
"- functions: 函数列表每个对象有name, summary, inputs, outputs\n"
"- classes: 类列表每个对象有name, summary, methods\n"
"- design_updates: 可选,设计更新字典\n"
"注意仅返回JSON不要其他文本。"
)
# 准备当前design.json中该文件的条目信息
current_entry = None
for f in self.design.files:
if f.path == file_path:
current_entry = f.model_dump()
break
user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目如果存在:\n{json.dumps(current_entry, indent=2) if current_entry else ''}"
try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
update_info = result
# 查找或创建文件条目
file_model = None
for f in self.design.files:
if f.path == file_path:
file_model = f
break
if file_model is None:
# 创建新条目包括design_updates
new_file = FileModel(
path=file_path,
summary=update_info.get("summary", ""),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}), # 新增design_updates处理
)
self.design.files.append(new_file)
logger.info(f"在design.json中创建了新文件条目: {file_path}")
else:
# 更新现有条目使用merge_design_updates处理design_updates
if 'design_updates' in update_info:
file_model.merge_design_updates(update_info['design_updates'])
# 更新其他字段
file_model.summary = update_info.get("summary", file_model.summary)
file_model.dependencies = update_info.get(
"dependencies", file_model.dependencies
)
file_model.functions = update_info.get(
"functions", file_model.functions
)
file_model.classes = update_info.get("classes", file_model.classes)
logger.info(f"更新了design.json中的文件条目: {file_path}")
# 保存更新后的design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_path}")
self.console.print(
f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]"
)
return True
except Exception as e:
logger.error(f"更新文件条目失败: {e}")
self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]")
return False
def sync_readme(self) -> bool: def sync_readme(self) -> bool:
""" """
同步README.md和design.json确保内容一致性 同步README.md和design.json利用哈希值判断一致性并更新
返回bool表示是否成功 如果哈希不一致更新design.json中的readme_path和readme_hash以匹配当前README文件
返回bool表示是否成功
""" """
logger.info("开始同步README.md和design.json") logger.info("开始同步README.md和design.json使用哈希值判断")
# 读取README.md
readme_path = self.output_dir / "README.md" readme_path = self.output_dir / "README.md"
design_path = self.output_dir / "design.json"
# 检查文件是否存在
if not readme_path.exists(): if not readme_path.exists():
logger.error(f"README.md不存在于 {self.output_dir}") logger.error(f"README.md不存在于 {self.output_dir}")
self.console.print( self.console.print(f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]")
f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]"
)
return False return False
try:
with open(readme_path, "r", encoding="utf-8") as f:
readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
return False
# 加载design.json
design_path = self.output_dir / "design.json"
if not design_path.exists(): if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}") logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print( self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]")
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False return False
# 调用LLM比较和同步
system_prompt = (
"你是一个软件架构师。比较README.md内容和design.json识别不一致之处并建议更新。"
"返回严格的JSON对象包含以下字段\n"
"- needs_update: bool, 是否需要更新\n"
"- update_type: 'readme''design''both', 指示哪个需要更新\n"
"- updates: 对象,描述具体的更新内容\n"
"注意仅返回JSON不要其他文本。"
)
user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}"
try: try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) # 创建 DesignManager 实例
needs_update = result.get("needs_update", False) design_manager = DesignManager(design_path)
if not needs_update: # 加载设计模型
logger.info("README.md和design.json已同步无需更新") design = design_manager.load_design()
self.console.print( # 校验哈希
"[green]✅ README.md和design.json已同步无需更新[/green]" if design_manager.validate_readme_hash(design, readme_path):
) logger.info("README.md和design.json哈希一致无需更新")
self.console.print("[green]✅ README.md和design.json哈希一致无需更新[/green]")
return True return True
update_type = result.get("update_type", "")
updates = result.get("updates", {})
if update_type == "readme":
# 更新README.md
new_readme = updates.get("new_readme", readme_content)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
logger.info("已更新README.md")
self.console.print("[green]✅ README.md已更新[/green]")
elif update_type == "design":
# 更新design.json
new_design_data = updates.get("new_design", design.model_dump())
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已更新design.json")
self.console.print("[green]✅ design.json已更新[/green]")
elif update_type == "both":
# 更新两者
new_readme = updates.get("new_readme", readme_content)
new_design_data = updates.get("new_design", design.model_dump())
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已同步更新README.md和design.json")
self.console.print("[green]✅ README.md和design.json已同步更新[/green]")
else: else:
logger.warning(f"未知的update_type: {update_type}") logger.info("README.md和design.json哈希不一致开始同步")
self.console.print( # 同步设计模型以匹配当前README
f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]" design = design_manager.sync_with_readme(design, readme_path)
) # 保存更新后的设计模型
return False design_manager.save_design(design)
return True logger.info("已更新design.json中的readme_path和readme_hash")
self.console.print("[green]✅ 已同步design.json更新了readme_path和readme_hash[/green]")
return True
except Exception as e: except Exception as e:
logger.error(f"同步README.md失败: {e}") logger.error(f"同步失败: {e}")
self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 同步失败: {e}[/bold red]")
return False return False

View File

@ -1,12 +1,194 @@
import json import json
from pathlib import Path from pathlib import Path
import pathspec
import os
import concurrent.futures
import traceback
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
import sys import sys
from loguru import logger from loguru import logger
from rich.console import Console from rich.console import Console
from .core import CodeGenerator from .core import CodeGenerator
from .models import DesignModel, FileModel, FileStatus, LLMResponse from .models import DesignModel, FileModel
from .design_manager import DesignManager
def get_non_ignored_files(root_path: Path):
"""
模拟 git 行为递归遍历目录应用每一层的 .gitignore 规则返回所有未被忽略的文件
"""
result_files = []
# 用于缓存每一层目录的 .gitignore 规则 (key: 目录路径, value: PathSpec 对象)
# 根目录的规则作为基础
specs = {}
# 预加载根目录的 .gitignore (如果存在)
root_gitignore = root_path / ".gitignore"
if root_gitignore.exists():
with open(root_gitignore, "r", encoding="utf-8") as f:
# 'gitwildmatch' 是 Git 使用的通配符模式
specs[root_path.resolve()] = pathspec.PathSpec.from_lines("gitwildmatch", f)
# 使用 walk 进行遍历,这样可以按目录层级处理
# os.walk 返回 (dirpath, dirnames, filenames)
# 我们手动将其转换为 Path 对象以便操作
for dirpath_str, dirnames, filenames in os.walk(root_path):
current_dir = Path(dirpath_str)
current_dir_resolved = current_dir.resolve()
# 1. 【核心逻辑】检查当前目录是否有 .gitignore
# 如果有,读取并合并到当前目录的规则中
gitignore_file = current_dir / ".gitignore"
if (
gitignore_file.exists() and gitignore_file != root_gitignore
): # 避免重复加载根目录
with open(gitignore_file, "r", encoding="utf-8") as f:
lines = f.readlines()
# 如果父目录有规则,需要继承吗?
# Git 的逻辑是:子目录的 .gitignore 仅对子目录及其后代有效。
# 但匹配文件时,需要结合从根到当前的所有层级的规则。
# 简单做法:为每个目录单独存一个 spec匹配时从当前目录向上回溯查找所有适用的 spec。
# 优化做法:为了性能,我们可以构建一个从根到当前的“累积 spec”或者在匹配时动态组合。
# 这里采用更稳健的“动态查找”策略:匹配文件时,检查该文件相对于各个祖先目录的路径是否被该祖先的 .gitignore 命中。
specs[current_dir_resolved] = pathspec.PathSpec.from_lines(
"gitwildmatch", lines
)
# 2. 过滤文件夹 (如果文件夹被忽略,就不需要进入该文件夹了)
# 注意:.git 目录通常应该被跳过
if ".git" in dirnames:
dirnames.remove(".git")
# 检查 dirnames 中的文件夹是否被当前或祖先的 .gitignore 忽略
# 这一步很重要,可以大幅减少遍历次数
dirs_to_remove = []
for d in dirnames:
full_dir_path = current_dir / d
if is_ignored(full_dir_path, root_path, specs):
dirs_to_remove.append(d)
for d in dirs_to_remove:
dirnames.remove(d)
# 3. 过滤文件
for filename in filenames:
file_path = current_dir / filename
# 跳过 .gitignore 文件本身?通常不需要,除非它被上层规则忽略
# 但 .git 目录内的文件已经在上面被排除了
if not is_ignored(file_path, root_path, specs):
# 返回相对于 root_path 的路径,符合你的 rglob 习惯
result_files.append(file_path.relative_to(root_path))
return result_files
def is_ignored(file_path: Path, root_path: Path, specs: dict) -> bool:
"""
判断一个文件是否被忽略
逻辑从文件所在目录开始向上直到根目录检查每一层的 .gitignore 规则
只要有一层规则说忽略且没有被下层规则取消忽略(!)则视为忽略
注意pathspec match 方法如果匹配到返回 True
Git 的优先级规则比较复杂后定义的覆盖先定义的子目录的规则针对子目录内容
为了简化且保持高准确度我们收集从根到文件父目录的所有 spec
按顺序从根到叶应用因为后面的规则可以覆盖前面的特别是 ! 规则
"""
if not specs:
return False
resolved_file = file_path.resolve()
# 构建从根目录到文件父目录的路径链
# 例如:/a/b/c/file.txt -> 检查 /a, /a/b, /a/b/c 的 spec
current = resolved_file.parent
chain = []
# 向上回溯收集所有相关的目录
while True:
if current.resolve() in specs:
chain.append(current.resolve())
if current == root_path or current == current.parent: # 到达根或文件系统根
break
current = current.parent
# 反转链,使其从根目录开始向下(因为子目录的规则优先级更高,且可以否定父目录的规则)
# Git 文档指出:父目录的 .gitignore 对整个树生效,但子目录的 .gitignore 可以重新定义。
# 实际上,我们将所有相关目录的规则合并成一个大的 spec 列表,按从根到叶的顺序匹配。
# pathspec 允许连续匹配,最后一个匹配的结果决定命运吗?
# 不完全是。在 Git 中,如果一个文件被父目录忽略,但子目录有一个 !file 规则,它会被包含。
# 所以我们需要按顺序应用所有匹配的规则。
# 相对路径计算:针对每一个 spec 所在的目录,计算文件相对于该目录的路径
for spec_dir in chain:
spec = specs[spec_dir]
try:
# 计算文件相对于该 .gitignore 所在目录的路径
rel_path = file_path.relative_to(spec_dir)
# 转换为 unix 风格斜杠,因为 gitignore 规则使用 /
rel_path_str = rel_path.as_posix()
if spec.match_file(rel_path_str):
# 如果匹配到了,我们需要知道它是“忽略”还是“不忽略”吗?
# pathspec 的 match_file 只要匹配模式就返回 True。
# 但是 .gitignore 中的 '!' 开头表示不忽略。
# pathspec 库在处理 'gitwildmatch' 时,会自动处理 '!' 逻辑吗?
# 查阅 pathspec 文档PathSpec.match_file 返回布尔值。
# 如果模式是 '!foo',它匹配 'foo' 时返回 True 吗?
# 实际上pathspec 的设计是:你给它所有行(包括 !),它内部会处理逻辑。
# 但是!如果是分多个文件加载的,我们需要小心。
# 最佳实践:将当前目录及所有父目录的 .gitignore 内容合并到一个列表中,然后创建一个总的 PathSpec。
pass
except ValueError:
continue
# --- 修正策略:合并所有祖先的 .gitignore 内容 ---
# 由于跨文件的优先级逻辑(子目录覆盖父目录)在分开调用 match_file 时很难完美模拟,
# 最稳妥的方法是:收集从根到当前文件父目录的所有 .gitignore 行,按顺序合并,一次性匹配。
all_lines = []
# 再次从根向下遍历(利用之前的 chain 反转)
for spec_dir in reversed(
chain
): # chain 刚才收集时是从下到上reversed 后是从上到下
# 重新读取文件内容以保持顺序(或者我们在第一步缓存内容而不是编译好的对象)
# 为了效率,我们修改上面的逻辑缓存 lines 而不是 specs 对象,或者这里重新读
gitignore_path = spec_dir / ".gitignore"
if gitignore_path.exists():
with open(gitignore_path, "r", encoding="utf-8") as f:
all_lines.extend(f.readlines())
if not all_lines:
return False
combined_spec = pathspec.PathSpec.from_lines("gitwildmatch", all_lines)
# 最终相对于根目录的路径
final_rel_path = file_path.relative_to(root_path).as_posix()
return combined_spec.match_file(final_rel_path)
"""
使用示例
path = Path(".")
# 获取排除 .gitignore 后的所有文件
files = get_non_ignored_files(path)
print(f"共找到 {len(files)} 个非忽略文件。")
print("前 10 个文件示例:")
for p in files[:10]:
print(p)
# 如果你想筛选特定的后缀,例如 .py
# py_files = [p for p in files if p.suffix == '.py']
"""
class DesignGenerator(CodeGenerator): class DesignGenerator(CodeGenerator):
@ -16,9 +198,8 @@ class DesignGenerator(CodeGenerator):
self, self,
api_key: Optional[str] = None, api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com", base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner", model: str = "deepseek-chat",
output_dir: str = "./generated", output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4, max_concurrency: int = 4,
): ):
"""初始化设计生成器。 """初始化设计生成器。
@ -36,13 +217,15 @@ class DesignGenerator(CodeGenerator):
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=output_dir, output_dir=output_dir,
log_file=log_file,
max_concurrency=max_concurrency, max_concurrency=max_concurrency,
) )
self.console = Console() self.console = Console()
self.history_designs: Optional[DesignModel] = None
logger.info("DesignGenerator 初始化完成") logger.info("DesignGenerator 初始化完成")
def process_design_command(self, issue_content: str, issue_type: str = "design") -> bool: def process_design_command(
self, issue_content: str, issue_type: str = "design"
) -> bool:
"""处理设计命令,基于工单内容实现设计文件的生成和增量更新逻辑。 """处理设计命令,基于工单内容实现设计文件的生成和增量更新逻辑。
包括源文件分析LLM调用和合并/覆盖处理 包括源文件分析LLM调用和合并/覆盖处理
@ -60,7 +243,9 @@ class DesignGenerator(CodeGenerator):
analysis_result = self._analyze_issue(issue_content, issue_type) analysis_result = self._analyze_issue(issue_content, issue_type)
affected_files = analysis_result.get("affected_files", []) affected_files = analysis_result.get("affected_files", [])
design_updates = analysis_result.get("design_updates", {}) design_updates = analysis_result.get("design_updates", {})
logger.debug(f"分析结果 - 受影响文件: {affected_files}, 设计更新: {design_updates}") logger.debug(
f"分析结果 - 受影响文件: {affected_files}, 设计更新: {design_updates}"
)
# 2. LLM调用和文件生成/更新 # 2. LLM调用和文件生成/更新
generated_files: List[str] = [] generated_files: List[str] = []
@ -87,9 +272,13 @@ class DesignGenerator(CodeGenerator):
# 构建生成指令 # 构建生成指令
if action == "create": if action == "create":
instruction = f"根据工单分析,创建文件 '{file_path}',描述: {description}" instruction = (
f"根据工单分析,创建文件 '{file_path}',描述: {description}"
)
else: # modify else: # modify
instruction = f"根据工单分析,修改文件 '{file_path}',描述: {description}" instruction = (
f"根据工单分析,修改文件 '{file_path}',描述: {description}"
)
# 调用LLM生成代码 # 调用LLM生成代码
code, desc, commands = self.generate_file( code, desc, commands = self.generate_file(
@ -125,6 +314,66 @@ class DesignGenerator(CodeGenerator):
self.console.print(f"[bold red]❌ 处理设计命令失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 处理设计命令失败: {e}[/bold red]")
return False return False
def _analyze_single_file(self, file_path: Path, root_dir: Path) -> Dict[str, Any]:
"""分析单个文件并返回设计信息字典。
Args:
file_path: 文件的绝对路径
root_dir: 项目根目录用于计算相对路径
Returns:
Dict[str, Any]: 包含 path, summary, dependencies 等信息的字典
"""
try:
# 计算相对路径
# 注意:这里需要确保 file_path 和 root_dir 是绝对路径或相对关系正确
# 如果 file_path 已经是相对于 output_dir 的,则直接使用
if file_path.is_absolute():
rel_path = str(file_path.relative_to(root_dir))
else:
rel_path = str(file_path)
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# 调用LLM分析单个文件
system_prompt = (
"你是一个软件架构师。分析给定的Python文件内容返回设计信息。"
"请返回一个严格的JSON对象必须包含以下字段"
"1. 'summary': 文件的简短摘要(字符串,必填)。"
"2. 'dependencies': 依赖的项目内源码、资源文件的相对项目根目录的路径列表,如:['src/project_name/file.py'],如果无项目内依赖可为空列表"
"3. 'functions': 文件中定义的全局函数列表。每个函数必须是一个对象,包含:"
" - 'name': 函数名(字符串)。"
" - 'summary': 函数的简短摘要(字符串,必填)。"
" - 'inputs': 函数的输入参数列表(字符串列表)。"
" - 'outputs': 函数的输出参数列表(字符串列表)。"
"4. 'classes': 文件中定义的类列表。每个类必须是一个对象,包含:"
" - 'name': 类名(字符串)。"
" - 'summary': 类的简短摘要(字符串,必填)。"
" - 'methods': 类中定义的方法列表。每个方法必须是一个对象,包含:"
" - 'name': 方法名(字符串)。"
" - 'summary': 方法的简短摘要(字符串,必填)。"
" - 'inputs': 方法的输入参数列表(字符串列表)。"
" - 'outputs': 方法的输出参数列表(字符串列表)。"
"请确保JSON格式正确所有必填字段summary都已填写。不要输出Markdown代码块标记直接输出JSON字符串。"
)
user_prompt = f"文件路径: {rel_path}\n文件内容:\n{content}"
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
# 构建文件模型
return {
"path": rel_path,
"summary": result.get("summary", "自动分析生成"),
"dependencies": result.get("dependencies", []),
"functions": result.get("functions", []),
"classes": result.get("classes", []),
"design_updates": {},
}
except Exception as e:
logger.error(f"分析文件 {file_path} 失败: {e}")
return None
def analyze_source_files(self, source_dir: Path) -> Dict[str, Any]: def analyze_source_files(self, source_dir: Path) -> Dict[str, Any]:
"""分析源代码目录以提取设计信息用于生成或刷新design.json。 """分析源代码目录以提取设计信息用于生成或刷新design.json。
@ -139,40 +388,45 @@ class DesignGenerator(CodeGenerator):
logger.error(f"源代码目录不存在: {source_dir}") logger.error(f"源代码目录不存在: {source_dir}")
raise FileNotFoundError(f"目录不存在: {source_dir}") raise FileNotFoundError(f"目录不存在: {source_dir}")
# 收集所有Python文件 # 确定根目录,用于计算相对路径
python_files = list(source_dir.rglob("*.py")) root_dir = (
self.output_dir
if source_dir.is_relative_to(self.output_dir)
else source_dir
)
# 收集所有需要处理的文件
files = [
p for p in get_non_ignored_files(root_dir) if p.is_relative_to(source_dir)
]
logger.info(f"共需要处理 {len(files)} 个文件")
design_info = {"files": [], "dependencies": {}} design_info = {"files": [], "dependencies": {}}
# 简单分析遍历文件并调用LLM提取信息可优化为并发 # 使用线程池并发分析文件
for file_path in python_files: # max_workers 设置为 self.max_concurrency与初始化参数保持一致
rel_path = str(file_path.relative_to(source_dir)) with concurrent.futures.ThreadPoolExecutor(
try: max_workers=self.max_concurrency
with open(file_path, "r", encoding="utf-8") as f: ) as executor:
content = f.read() # 创建 future 到 file_path 的映射,以便在出错时记录日志
future_to_path = {
executor.submit(self._analyze_single_file, p, root_dir): p
for p in files
}
# 调用LLM分析单个文件 for future in concurrent.futures.as_completed(future_to_path):
system_prompt = ( file_path = future_to_path[future]
"你是一个软件架构师。分析给定的Python文件内容返回设计信息包括摘要、依赖、函数和类。" try:
"返回严格的JSON对象包含summary、dependencies、functions、classes字段。" result = future.result()
) if result:
user_prompt = f"文件路径: {rel_path}\n文件内容:\n{content}" design_info["files"].append(result)
result = self._call_llm(system_prompt, user_prompt, temperature=0.2) design_info["dependencies"][result["path"]] = result[
"dependencies"
# 构建文件模型 ]
file_model = { logger.debug(f"已分析文件: {result['path']}")
"path": rel_path, except Exception as e:
"summary": result.get("summary", "自动分析生成"), # 这里的异常通常由 _analyze_single_file 内部捕获并返回 None
"dependencies": result.get("dependencies", []), # 但如果 _call_llm 抛出了未捕获的异常,会在这里被捕获
"functions": result.get("functions", []), logger.error(f"处理文件 {file_path} 时发生未预期的异常: {e}")
"classes": result.get("classes", []),
"design_updates": {},
}
design_info["files"].append(file_model)
design_info["dependencies"][rel_path] = file_model["dependencies"]
logger.debug(f"已分析文件: {rel_path}")
except Exception as e:
logger.error(f"分析文件 {rel_path} 失败: {e}")
# 跳过失败的文件
logger.info(f"源代码分析完成,共分析 {len(design_info['files'])} 个文件") logger.info(f"源代码分析完成,共分析 {len(design_info['files'])} 个文件")
return design_info return design_info
@ -190,44 +444,143 @@ class DesignGenerator(CodeGenerator):
try: try:
# 分析源代码 # 分析源代码
design_info = self.analyze_source_files(source_dir) design_info = self.analyze_source_files(source_dir)
design_path = self.output_dir / "design.json"
if not self.history_designs:
design_manager = DesignManager(design_path)
self.history_designs = design_manager.load_design()
# 构建DesignModel # 构建DesignModel
design = DesignModel( design = DesignModel(
project_name=self.design.project_name if self.design else "llm-codegen", project_name=self.history_designs.project_name if self.history_designs else "llm-codegen",
version=self.design.version if self.design else "1.0.0", version=self.history_designs.version if self.history_designs else "0.0.1",
description=self.design.description if self.design else "基于大语言模型的代码生成工具", description=self.history_designs.description
if self.history_designs
else "基于大语言模型的代码生成工具",
files=[FileModel(**file) for file in design_info["files"]], files=[FileModel(**file) for file in design_info["files"]],
commands=self.design.commands if self.design else [], commands=self.history_designs.commands if self.history_designs else [],
check_tools=self.design.check_tools if self.design else [], check_tools=self.history_designs.check_tools if self.history_designs else [],
) )
# 保存design.json # 保存design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f: with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
self.design = design self.history_designs = design
logger.info(f"design.json 已刷新并保存至: {design_path}") logger.info(f"design.json 已刷新并保存至: {design_path}")
self.console.print("[green]✅ design.json 已从源代码刷新[/green]") self.console.print("[green]✅ design.json 已从源代码刷新[/green]")
return True return True
except Exception as e: except Exception as e:
logger.error(f"从源代码刷新design.json失败: {e}") error_message = traceback.format_exc()
self.console.print(f"[bold red]❌ 从源代码刷新design.json失败: {e}[/bold red]") logger.error(f"从源代码刷新design.json失败: {e}, 堆栈跟踪:\n{error_message}")
self.console.print(
f"[bold red]❌ 从源代码刷新design.json失败: {e}[/bold red]"
)
return False return False
def run(self, readme_path: Optional[Path] = None, issue_content: Optional[str] = None) -> None: def update_single_file_design(self, file_path: Path) -> bool:
"""
更新design.json中单个文件的条目基于文件内容
复用 _analyze_single_file 逻辑以确保一致性
Args:
file_path: 文件的绝对路径或相对于 output_dir 的路径
Returns:
bool: 是否成功更新
"""
logger.info(f"开始更新design.json中文件条目: {file_path}")
# 确保 design 已加载
if not self.history_designs:
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]")
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.history_designs = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 确定文件绝对路径
abs_file_path = file_path if file_path.is_absolute() else (self.output_dir / file_path)
# 复用 _analyze_single_file 获取设计信息
# 注意_analyze_single_file 期望 file_path 是 Path 对象
file_info = self._analyze_single_file(abs_file_path, self.output_dir)
if not file_info:
logger.error(f"分析文件 {file_path} 失败无法更新design.json")
return False
# 查找或创建文件条目
file_model = None
for f in self.history_designs.files:
if f.path == file_info["path"]:
file_model = f
break
if file_model is None:
# 创建新条目
new_file = FileModel(**file_info)
self.history_designs.files.append(new_file)
logger.info(f"在design.json中创建了新文件条目: {file_info['path']}")
else:
# 更新现有条目
# 使用 Pydantic 的 model_update 方法或直接赋值
# 这里我们直接赋值,因为 file_info 的结构与 FileModel 匹配
file_model.summary = file_info.get("summary", file_model.summary)
file_model.dependencies = file_info.get("dependencies", file_model.dependencies)
file_model.functions = file_info.get("functions", file_model.functions)
file_model.classes = file_info.get("classes", file_model.classes)
logger.info(f"更新了design.json中的文件条目: {file_info['path']}")
# 保存更新后的design.json
design_path = self.output_dir / "design.json"
try:
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.history_designs.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_info['path']}")
self.console.print(f"[green]✅ design.json中文件条目 {file_info['path']} 已更新[/green]")
return True
except Exception as e:
logger.error(f"保存design.json失败: {e}")
self.console.print(f"[bold red]❌ 保存design.json失败: {e}[/bold red]")
return False
def run(
self,
readme_path: Optional[Path] = None,
issue_content: Optional[str] = None,
update_file: Optional[Path] = None # 新增:处理单个文件更新
) -> None:
"""主执行流程,用于集成到命令行接口。 """主执行流程,用于集成到命令行接口。
Args: Args:
readme_path: README文件路径可选 readme_path: README文件路径可选
issue_content: 工单内容字符串可选 issue_content: 工单内容字符串可选
update_file: 单个文件路径用于更新design.json中对应条目可选
""" """
if readme_path: if update_file:
# 处理单个文件更新
logger.info(f"开始处理单个文件更新: {update_file}")
success = self.update_single_file_design(update_file)
if success:
logger.info(f"单个文件更新完成: {update_file}")
self.console.print(f"[green]✅ 单个文件更新完成: {update_file}[/green]")
else:
logger.error(f"单个文件更新失败: {update_file}")
self.console.print(f"[bold red]❌ 单个文件更新失败: {update_file}[/bold red]")
sys.exit(1)
elif readme_path:
self.readme_content = self.parse_readme(readme_path) self.readme_content = self.parse_readme(readme_path)
self.design = self.generate_design_json() self.history_designs = self.sync_readme()
logger.info("已从README生成design.json") logger.info("已从README生成design.json")
elif issue_content:
if issue_content:
success = self.process_design_command(issue_content) success = self.process_design_command(issue_content)
if success: if success:
logger.info("设计命令处理完成") logger.info("设计命令处理完成")
@ -235,7 +588,8 @@ class DesignGenerator(CodeGenerator):
logger.error("设计命令处理失败") logger.error("设计命令处理失败")
sys.exit(1) sys.exit(1)
else: else:
logger.warning("未提供工单内容,仅生成或刷新设计文件") logger.warning("未提供任何输入参数,仅生成或刷新设计文件")
# 可以添加默认行为,但当前为空
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -0,0 +1,136 @@
from pathlib import Path
from typing import Optional
import hashlib
import json
from .models import DesignModel
from .utils import read_file, write_file
class DesignManager:
"""设计管理类,用于处理 design.json 文件的加载、保存、README 哈希计算、校验和同步功能。"""
def __init__(self, design_file_path: Optional[Path] = None):
"""初始化 DesignManager。
参数:
design_file_path: design.json 文件的可选路径如果提供则可在后续方法中省略路径参数
"""
self.design_file_path = design_file_path
def load_design(self, file_path: Optional[Path] = None) -> DesignModel:
"""加载 design.json 文件并解析为 DesignModel 对象。
参数:
file_path: design.json 文件路径如果为 None则使用初始化时设置的路径
返回:
DesignModel: 解析后的设计模型
异常:
FileNotFoundError: 如果文件不存在
ValueError: 如果 JSON 解析或模型验证失败
"""
if file_path is None:
if self.design_file_path is None:
raise ValueError("未提供 design.json 文件路径")
file_path = self.design_file_path
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"design.json 文件不存在: {file_path}")
content = read_file(str(file_path))
try:
design_dict = json.loads(content)
design = DesignModel.parse_obj(design_dict)
return design
except json.JSONDecodeError as e:
raise ValueError(f"JSON 解析失败: {e}")
except Exception as e:
raise ValueError(f"模型验证失败: {e}")
def save_design(self, design: DesignModel, file_path: Optional[Path] = None) -> None:
"""将 DesignModel 对象保存为 design.json 文件。
参数:
design: 要保存的设计模型
file_path: 保存的文件路径如果为 None则使用初始化时设置的路径
异常:
ValueError: 如果未提供文件路径且初始化时未设置
"""
if file_path is None:
if self.design_file_path is None:
raise ValueError("未提供 design.json 文件路径")
file_path = self.design_file_path
file_path = Path(file_path)
design_dict = design.dict(exclude_none=True)
content = json.dumps(design_dict, indent=2, ensure_ascii=False)
write_file(str(file_path), content)
@staticmethod
def compute_readme_hash(readme_path: Path) -> str:
"""计算 README 文件的 SHA256 哈希值。
参数:
readme_path: README 文件路径例如 README.md
返回:
str: 十六进制字符串表示的 SHA256 哈希值
异常:
FileNotFoundError: 如果文件不存在
"""
readme_path = Path(readme_path)
if not readme_path.exists():
raise FileNotFoundError(f"README 文件不存在: {readme_path}")
with open(readme_path, 'rb') as f:
content = f.read()
hash_obj = hashlib.sha256(content)
return hash_obj.hexdigest()
def validate_readme_hash(self, design: DesignModel, readme_path: Optional[Path] = None) -> bool:
"""校验 README 文件的哈希值与 design.json 中存储的哈希值是否一致。
参数:
design: 设计模型包含存储的 readme_hash
readme_path: README 文件路径如果为 None则使用 design.readme_path
返回:
bool: 如果哈希一致或 readme_hash None视为无效返回 True否则返回 False
"""
if readme_path is None:
if design.readme_path is None:
return False # 没有存储的路径,无法校验
readme_path = Path(design.readme_path)
else:
readme_path = Path(readme_path)
if design.readme_hash is None:
return False # 没有存储的哈希,视为无效
try:
computed_hash = self.compute_readme_hash(readme_path)
return computed_hash == design.readme_hash
except FileNotFoundError:
return False # 文件不存在,校验失败
def sync_with_readme(self, design: DesignModel, readme_path: Path) -> DesignModel:
"""同步 design.json 与 README 文件,更新 readme_path 和 readme_hash。
参数:
design: 要更新的设计模型
readme_path: README 文件路径
返回:
DesignModel: 更新后的设计模型原地修改并返回同一个对象
"""
readme_path = Path(readme_path)
if not readme_path.exists():
raise FileNotFoundError(f"README 文件不存在: {readme_path}")
new_hash = self.compute_readme_hash(readme_path)
design.readme_path = str(readme_path)
design.readme_hash = new_hash
return design

View File

@ -1,162 +0,0 @@
"""Diff 应用模块,使用 unidiff2 解析和应用 unified diff 格式。"""
import os
from typing import List, Dict, Any
from unidiff import PatchSet, Hunk
def _clean_path(path: str) -> str:
"""清理路径,移除 a/ 或 b/ 前缀。"""
if path.startswith('a/'):
return path[2:]
if path.startswith('b/'):
return path[2:]
return path
def parse_diff(diff: str) -> List[str]:
"""
解析 unified diff 字符串提取受影响的文件路径
Args:
diff: unified diff 格式的字符串
Returns:
文件路径列表
"""
try:
patch_set = PatchSet(diff)
files = set()
for patch in patch_set:
if patch.target_file and patch.target_file != '/dev/null':
cleaned_path = _clean_path(patch.target_file)
files.add(cleaned_path)
return list(files)
except Exception:
# 解析失败时返回空列表,避免干扰
return []
def _apply_single_patch_to_content(file_content_lines: List[str], patch_hunks: List[Hunk]) -> List[str]:
"""
将一个文件的补丁多个hunk应用到其内容上
Args:
file_content_lines: 文件内容的行列表每行可能带换行符
patch_hunks: 针对该文件的一个或多个Hunk对象列表
Returns:
应用了补丁后的新内容行列表
"""
# 从后往前处理 hunk避免行号变化影响后续 hunk
sorted_hunks = sorted(patch_hunks, key=lambda x: x.source_start, reverse=True)
current_lines = file_content_lines[:]
for hunk in sorted_hunks:
source_start = hunk.source_start - 1 # 转换为0索引
source_len = hunk.source_length
# 从 diff 中提取源行内容,去除所有尾随空白
source_lines_from_diff = [line.value.rstrip() for line in hunk.source_lines()]
# 提取实际文件对应行,并去除所有尾随空白(包括换行符)
actual_source_lines = current_lines[source_start : source_start + source_len]
actual_source_for_comparison = [line.rstrip() for line in actual_source_lines]
if source_lines_from_diff != actual_source_for_comparison:
raise ValueError(f"Hunk at line {hunk.source_start} does not match source file content.")
# 构建目标内容:去除尾随空白后统一添加换行符
new_part = [line.value.rstrip() + '\n' for line in hunk.target_lines()]
# 替换原内容区域
current_lines = (current_lines[:source_start] +
new_part +
current_lines[source_start + source_len:])
return current_lines
def apply_diff(diff: str, target_dir: str = ".") -> Dict[str, Any]:
"""
应用 unified diff 到指定目录
Args:
diff: unified diff 格式的字符串
target_dir: 目标目录路径默认为当前目录
Returns:
字典包含 success, message, applied_files, error_details
"""
result = {
'success': False,
'message': '',
'applied_files': [],
'error_details': ''
}
if not diff or diff.strip() == '':
result['message'] = 'Diff string is empty'
return result
try:
patch_set = PatchSet(diff)
affected_files = []
for patch in patch_set:
if patch.target_file and patch.target_file != '/dev/null':
cleaned_path = _clean_path(patch.target_file)
affected_files.append(cleaned_path)
except Exception as e:
result['message'] = f"Failed to parse diff: {str(e)}"
result['error_details'] = str(e)
return result
if not os.path.isdir(target_dir):
result['message'] = f"Target directory does not exist: {target_dir}"
return result
try:
for patch_obj in patch_set:
target_path = _clean_path(patch_obj.target_file)
if not target_path or target_path == '/dev/null':
continue
full_file_path = os.path.join(target_dir, target_path)
source_path = _clean_path(patch_obj.source_file)
if source_path == '/dev/null':
original_content_lines = []
else:
try:
with open(full_file_path, 'r', encoding='utf-8') as f:
original_content_lines = f.readlines()
except FileNotFoundError:
original_content_lines = []
modified_content_lines = _apply_single_patch_to_content(
original_content_lines,
list(patch_obj)
)
os.makedirs(os.path.dirname(full_file_path), exist_ok=True)
with open(full_file_path, 'w', encoding='utf-8', newline='') as f:
f.writelines(modified_content_lines)
result['success'] = True
result['message'] = 'Diff applied successfully'
result['applied_files'] = affected_files
except Exception as e:
result['message'] = f"Error while applying diff: {str(e)}"
result['error_details'] = str(e)
return result
# 如果作为脚本运行,可以提供简单的测试
if __name__ == "__main__":
# 示例用法
sample_diff = """--- a/old_file.txt
+++ b/new_file.txt
@@ -1 +1 @@
-Hello World
+Hello Universe
"""
print("Testing apply_diff with unidiff2...")
res = apply_diff(sample_diff, ".")
print(res)

View File

@ -1,10 +1,9 @@
import json
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Optional
from loguru import logger from loguru import logger
from rich.console import Console
from .core import CodeGenerator from .core import CodeGenerator
from .design_manager import DesignManager # 添加导入
class EnhanceGenerator(CodeGenerator): class EnhanceGenerator(CodeGenerator):
@ -38,7 +37,6 @@ class EnhanceGenerator(CodeGenerator):
base_url=base_url, base_url=base_url,
model=model, model=model,
output_dir=output_dir, output_dir=output_dir,
log_file=log_file,
max_concurrency=max_concurrency max_concurrency=max_concurrency
) )
logger.info("EnhanceGenerator 初始化完成") logger.info("EnhanceGenerator 初始化完成")
@ -57,6 +55,33 @@ class EnhanceGenerator(CodeGenerator):
logger.info(f"开始处理增强工单: {issue_file_path}") logger.info(f"开始处理增强工单: {issue_file_path}")
self.console.print(f"[bold blue]🔧 处理增强工单: {issue_file_path}[/bold blue]") self.console.print(f"[bold blue]🔧 处理增强工单: {issue_file_path}[/bold blue]")
# 添加README哈希检查逻辑
try:
# 尝试加载design.json以检查README哈希
design_file_path = Path("design.json")
if not design_file_path.exists():
# 如果在当前目录找不到,尝试在输出目录中查找
design_file_path = self.output_dir / "design.json"
dm = DesignManager(design_file_path)
design = dm.load_design()
# 获取README路径
readme_path = Path(design.readme_path) if design.readme_path else Path("README.md")
# 校验哈希
if not dm.validate_readme_hash(design, readme_path):
warning_msg = f"README文件 {readme_path} 的哈希值与design.json中记录的不一致可能存在内容变更。"
logger.warning(warning_msg)
self.console.print(f"[yellow]⚠ {warning_msg}[/yellow]")
else:
logger.info("README哈希校验通过")
except Exception as e:
# 如果无法检查哈希,记录错误但不停止流程
error_msg = f"检查README哈希失败: {e}"
logger.error(error_msg)
self.console.print(f"[yellow]⚠ {error_msg}[/yellow]")
# 1. 读取工单文件内容 # 1. 读取工单文件内容
try: try:
with open(issue_file_path, 'r', encoding='utf-8') as f: with open(issue_file_path, 'r', encoding='utf-8') as f:
@ -65,7 +90,7 @@ class EnhanceGenerator(CodeGenerator):
except Exception as e: except Exception as e:
logger.error(f"读取工单文件失败: {e}") logger.error(f"读取工单文件失败: {e}")
self.console.print(f"[bold red]❌ 读取工单文件失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 读取工单文件失败: {e}[/bold red]")
return False return (False, [])
# 2. 调用 LLM 分析工单,获取变更计划 # 2. 调用 LLM 分析工单,获取变更计划
try: try:
@ -74,15 +99,14 @@ class EnhanceGenerator(CodeGenerator):
logger.error(f"分析工单失败: {e}") logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
raise e raise e
return False return (False, [])
affected_files = analysis_result.get("affected_files", []) affected_files = analysis_result.get("affected_files", [])
design_updates = analysis_result.get("design_updates", {})
if not affected_files: if not affected_files:
logger.warning("工单分析未发现需要变更的文件") logger.warning("工单分析未发现需要变更的文件")
self.console.print("[yellow]⚠ 工单分析未发现需要变更的文件[/yellow]") self.console.print("[yellow]⚠ 工单分析未发现需要变更的文件[/yellow]")
return True # 无变更,视为成功 return (True, []) # 无变更,视为成功
logger.info(f"分析到 {len(affected_files)} 个受影响文件") logger.info(f"分析到 {len(affected_files)} 个受影响文件")
self.console.print(f"[green]📋 分析到 {len(affected_files)} 个受影响文件[/green]") self.console.print(f"[green]📋 分析到 {len(affected_files)} 个受影响文件[/green]")
@ -96,7 +120,7 @@ class EnhanceGenerator(CodeGenerator):
except ValueError as e: except ValueError as e:
logger.error(f"拓扑排序失败,检测到循环依赖: {e}, dependencies: {dependencies}") logger.error(f"拓扑排序失败,检测到循环依赖: {e}, dependencies: {dependencies}")
self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]") self.console.print(f"[bold red]❌ 拓扑排序失败,检测到循环依赖: {e}[/bold red]")
return False return (False, [])
# 根据排序顺序获取文件信息 # 根据排序顺序获取文件信息
sorted_file_infos = [] sorted_file_infos = []
@ -168,26 +192,7 @@ class EnhanceGenerator(CodeGenerator):
# 继续处理其他文件,但记录失败 # 继续处理其他文件,但记录失败
continue continue
# 6. 更新 design.json 以反映变更
if generated_files:
try:
self._update_design(generated_files, design_updates)
logger.info(f"已更新 design.json包含 {len(generated_files)} 个文件变更")
self.console.print(f"[green]✅ 已更新 design.json[/green]")
except Exception as e:
logger.error(f"更新 design.json 失败: {e}")
self.console.print(f"[bold red]❌ 更新 design.json 失败: {e}[/bold red]")
# 不返回 False因为文件已生成仅记录错误
# 7. 可选:执行全局命令或检查(如有需要,可从 design.json 读取)
# 此处可根据设计添加,例如运行检查工具
if self.design and self.design.commands:
logger.info("开始执行项目命令")
for cmd in self.design.commands:
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续: {cmd}")
logger.info("增强处理流程完成") logger.info("增强处理流程完成")
self.console.print("[bold green]🎉 增强处理流程完成[/bold green]") self.console.print("[bold green]🎉 增强处理流程完成[/bold green]")
return True # 返回 True 表示成功, 返回生成的文件列表
return (True, affected_files)

View File

@ -1,8 +1,9 @@
import json from typing import List
from pathlib import Path from pathlib import Path
from typing import List, Optional
from .core import CodeGenerator from .core import CodeGenerator
from .models import OutputFormat from .models import OutputFormat
from .design_manager import DesignManager
class FixGenerator(CodeGenerator): class FixGenerator(CodeGenerator):
@ -16,7 +17,7 @@ class FixGenerator(CodeGenerator):
self, self,
bug_issue_path: Path, bug_issue_path: Path,
output_format: OutputFormat = OutputFormat.FULL, output_format: OutputFormat = OutputFormat.FULL,
) -> bool: ) -> (bool, List[Path]):
""" """
处理 fix 命令逻辑读取 Bug 工单分析变更生成并应用修复代码 处理 fix 命令逻辑读取 Bug 工单分析变更生成并应用修复代码
@ -34,7 +35,34 @@ class FixGenerator(CodeGenerator):
except Exception as e: except Exception as e:
self.logger.error(f"读取 Bug 工单文件失败: {e}") self.logger.error(f"读取 Bug 工单文件失败: {e}")
self.console.print(f"[bold red]❌ 读取 Bug 工单文件失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 读取 Bug 工单文件失败: {e}[/bold red]")
return False return (False, [])
# 添加 README 哈希检查逻辑
try:
design_file_path = self.output_dir / "design.json"
if design_file_path.exists():
design_manager = DesignManager(design_file_path)
design = design_manager.load_design()
readme_path = None
if design.readme_path:
readme_path = Path(design.readme_path)
else:
readme_path = self.output_dir / "README.md"
if readme_path and readme_path.exists():
if not design_manager.validate_readme_hash(design, readme_path):
warning_msg = "README 哈希不一致,当前内容可能与设计不符"
self.logger.warning(warning_msg)
self.console.print(f"[bold yellow]⚠ 警告: {warning_msg}[/bold yellow]")
else:
self.logger.info("README 哈希检查通过")
else:
self.logger.warning("README 文件不存在,无法进行哈希检查")
else:
self.logger.warning("design.json 文件不存在,无法检查 README 哈希")
except Exception as e:
self.logger.warning(f"检查 README 哈希时出错: {e}")
self.console.print(f"[bold yellow]⚠ 警告: 检查 README 哈希时出错: {e}[/bold yellow]")
# 调用 LLM 分析工单,获取变更计划 # 调用 LLM 分析工单,获取变更计划
try: try:
@ -42,16 +70,15 @@ class FixGenerator(CodeGenerator):
except Exception as e: except Exception as e:
self.logger.error(f"分析工单失败: {e}") self.logger.error(f"分析工单失败: {e}")
self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]") self.console.print(f"[bold red]❌ 分析工单失败: {e}[/bold red]")
return False return (False, [])
affected_files = analysis.get("affected_files", []) affected_files = analysis.get("affected_files", [])
design_updates = analysis.get("design_updates", {})
successful_files = [] successful_files = []
for file_info in affected_files: for file_info in affected_files:
file_path = file_info.get("path") file_path = file_info.get("path")
action = file_info.get("action") # 'create' 或 'modify' action = file_info.get("action") # 'create' 或 'modify'
description = file_info.get("description", "") # description = file_info.get("description", "")
dependencies = file_info.get("dependencies", []) dependencies = file_info.get("dependencies", [])
if action == "modify": if action == "modify":
@ -72,7 +99,7 @@ class FixGenerator(CodeGenerator):
# 生成修复代码 # 生成修复代码
instruction = ( instruction = (
f"根据 Bug 工单修复文件 '{file_path}'。工单摘要: {issue_content[:200]}..." f"根据 Bug 工单修复文件 '{file_path}'..."
) )
code, desc, commands = self.generate_file( code, desc, commands = self.generate_file(
file_path=file_path, file_path=file_path,
@ -103,7 +130,7 @@ class FixGenerator(CodeGenerator):
elif action == "create": elif action == "create":
# 创建新文件:无需现有内容 # 创建新文件:无需现有内容
instruction = ( instruction = (
f"根据 Bug 工单创建文件 '{file_path}'。工单摘要: {issue_content[:200]}..." f"根据 Bug 工单创建文件 '{file_path}'..."
) )
code, desc, commands = self.generate_file( code, desc, commands = self.generate_file(
file_path=file_path, file_path=file_path,
@ -129,18 +156,11 @@ class FixGenerator(CodeGenerator):
for cmd in commands: for cmd in commands:
self.execute_command(cmd, cwd=self.output_dir) self.execute_command(cmd, cwd=self.output_dir)
# 添加新条目到 design.json
self._update_design([file_path], design_updates)
# 如果有设计更新,整体更新 design.json
if design_updates and successful_files:
self._update_design(successful_files, design_updates)
if successful_files: if successful_files:
self.logger.info(f"修复成功,处理了 {len(successful_files)} 个文件") self.logger.info(f"修复成功,处理了 {len(successful_files)} 个文件")
self.console.print(f"[green]✅ 修复成功,处理了 {len(successful_files)} 个文件[/green]") self.console.print(f"[green]✅ 修复成功,处理了 {len(successful_files)} 个文件[/green]")
return True return (True, successful_files)
else: else:
self.logger.error("修复失败,没有文件被成功处理") self.logger.error("修复失败,没有文件被成功处理")
self.console.print("[bold red]❌ 修复失败,没有文件被成功处理[/bold red]") self.console.print("[bold red]❌ 修复失败,没有文件被成功处理[/bold red]")
return False return (False, [])

View File

@ -1,12 +1,13 @@
import json
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from .core import BaseGenerator from .core import CodeGenerator
from loguru import logger # 确保日志可用 from loguru import logger # 确保日志可用
from .design_generator import DesignGenerator
from .design_manager import DesignManager # 新增导入
class InitGenerator(BaseGenerator): class InitGenerator(CodeGenerator):
"""处理 init 命令的生成器类,继承自 BaseGenerator用于从 README 初始化项目。""" """处理 init 命令的生成器类,继承自 CodeGenerator用于从 README 初始化项目。"""
def __init__( def __init__(
self, self,
@ -14,7 +15,6 @@ class InitGenerator(BaseGenerator):
base_url: str = "https://api.deepseek.com", base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner", model: str = "deepseek-reasoner",
output_dir: str = "./generated", output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4 max_concurrency: int = 4
): ):
"""初始化 InitGenerator。 """初始化 InitGenerator。
@ -27,7 +27,13 @@ class InitGenerator(BaseGenerator):
log_file: 日志文件路径 log_file: 日志文件路径
max_concurrency: 最大并发数 max_concurrency: 最大并发数
""" """
super().__init__(api_key, base_url, model, output_dir, log_file, max_concurrency) super().__init__(api_key, base_url, model, output_dir, max_concurrency)
self.design_generator = DesignGenerator(
api_key=api_key,
base_url=base_url,
output_dir=output_dir,
max_concurrency=max_concurrency
)
def run(self, readme_path: Path) -> None: def run(self, readme_path: Path) -> None:
"""处理 init 命令逻辑:根据 README.md 初始化项目。 """处理 init 命令逻辑:根据 README.md 初始化项目。
@ -36,16 +42,23 @@ class InitGenerator(BaseGenerator):
readme_path: README 文件路径 readme_path: README 文件路径
""" """
logger.info(f"开始初始化项目README路径: {readme_path}") logger.info(f"开始初始化项目README路径: {readme_path}")
self.console.print(f"[bold]🚀 开始初始化项目...[/bold]") self.console.print("[bold]🚀 开始初始化项目...[/bold]")
# 1. 读取 README # 1. 读取 README
self.readme_content = self.parse_readme(readme_path) self.readme_content = self.parse_readme(readme_path)
logger.info("README读取完成") logger.info("README读取完成")
# 2. 生成 design.json # 2. 生成 design.json
self.design = self.generate_design_json() self.design_generator.run(readme_path, None)
logger.info("design.json生成完成") logger.info("design.json生成完成")
# 计算README哈希并更新design.json
design_manager = DesignManager(Path(self.output_dir) / "design.json")
self.design = design_manager.load_design()
design_manager.sync_with_readme(self.design, readme_path)
design_manager.save_design(self.design)
logger.info("README哈希计算并存储到design.json完成")
# 3. 获取文件列表和依赖 # 3. 获取文件列表和依赖
files, dependencies = self.get_project_structure() files, dependencies = self.get_project_structure()
logger.info(f"获取到 {len(files)} 个待生成文件") logger.info(f"获取到 {len(files)} 个待生成文件")

View File

@ -0,0 +1,160 @@
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from openai import OpenAI
from loguru import logger
import pendulum
class LLMClient:
"""LLM 客户端,负责与模型交互并管理响应记录。"""
def __init__(
self,
api_key: str,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: Path = Path("./generated"),
max_retries: int = 3,
retry_delay: float = 1.0,
):
"""
初始化 LLM 客户端
Args:
api_key: API 密钥
base_url: API 基础 URL
model: 使用的模型名称
output_dir: 输出目录用于保存响应文件
logger: 日志记录器若为 None 则使用 loguru 的默认 logger
max_retries: 最大重试次数
retry_delay: 重试间隔指数退避的基数
"""
self.api_key = api_key
self.base_url = base_url
self.model = model
self.output_dir = Path(output_dir)
self.max_retries = max_retries
self.retry_delay = retry_delay
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
self.responses_dir = self.output_dir / "llm_responses"
self.responses_dir.mkdir(parents=True, exist_ok=True)
def call(
self,
messages: List[Dict[str, str]],
temperature: float = 0.2,
expect_json: bool = True,
) -> Dict[str, Any]:
"""
调用 LLM 并返回解析后的结果
Args:
messages: OpenAI 格式的消息列表 [{"role": "system", "content": "..."}, ...]
temperature: 温度参数
expect_json: 是否期望 JSON 输出如果为 True将设置 response_format 并解析 JSON
Returns:
expect_json True返回解析后的字典
否则返回 {"content": str}
Raises:
ValueError: JSON 解析失败或 LLM 返回空内容
Exception: 重试后仍失败则抛出最后一次异常
"""
logger.debug(f"调用 LLM模型: {self.model},消息数: {len(messages)}")
last_exception = None
for attempt in range(self.max_retries):
try:
# 构建请求参数
kwargs = {
"model": self.model,
"messages": messages,
"temperature": temperature,
}
if expect_json:
kwargs["response_format"] = {"type": "json_object"}
response = self.client.chat.completions.create(**kwargs)
message = response.choices[0].message
content = message.content
reasoning_content = getattr(message, "reasoning_content", None)
# 保存响应记录
self._save_response(
messages=messages,
content=content,
reasoning_content=reasoning_content,
temperature=temperature,
expect_json=expect_json,
)
# 处理输出
if expect_json:
if not content:
raise ValueError("LLM 返回空内容")
try:
result = json.loads(content)
except json.JSONDecodeError as e:
logger.error(f"JSON 解析失败: {e}\n原始内容: {content[:500]}")
raise ValueError(f"LLM 返回的不是有效 JSON: {content[:200]}") from e
return result
else:
return {"content": content or ""}
except Exception as e:
last_exception = e
logger.warning(f"LLM 调用失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
delay = self.retry_delay * (2 ** attempt) # 指数退避
logger.info(f"等待 {delay:.1f} 秒后重试...")
time.sleep(delay)
else:
logger.error(f"LLM 调用最终失败: {e}")
raise last_exception
# 理论上不会执行到这里,但保留以防万一
raise last_exception or RuntimeError("LLM 调用失败,未捕获具体异常")
def _save_response(
self,
messages: List[Dict[str, str]],
content: str,
reasoning_content: Optional[str],
temperature: float,
expect_json: bool,
) -> None:
"""
LLM 响应保存到文件中
Args:
messages: 发送的消息列表
content: 返回的内容
reasoning_content: 思考内容若存在
temperature: 温度参数
expect_json: 是否期望 JSON
"""
timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS")
filename = f"response_{timestamp}.json"
filepath = self.responses_dir / filename
record = {
"timestamp": timestamp,
"model": self.model,
"messages": messages,
"content": content,
"reasoning_content": reasoning_content,
"temperature": temperature,
"expect_json": expect_json,
}
try:
with open(filepath, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM 响应已保存: {filepath.name}")
except Exception as e:
logger.warning(f"保存 LLM 响应失败: {e}")

View File

@ -22,15 +22,15 @@ class FunctionModel(BaseModel):
"""函数模型,对应 design.json 中的 functions 字段。""" """函数模型,对应 design.json 中的 functions 字段。"""
name: str name: str
summary: str summary: str
inputs: List[str] inputs: List[str] = Field(default_factory=list) # 确保默认为空列表
outputs: List[str] outputs: List[str] = Field(default_factory=list) # 确保默认为空列表
class ClassModel(BaseModel): class ClassModel(BaseModel):
"""类模型,对应 design.json 中的 classes 字段。""" """类模型,对应 design.json 中的 classes 字段。"""
name: str name: str
summary: str summary: str
methods: List[str] methods: List[FunctionModel]
class FileModel(BaseModel): class FileModel(BaseModel):
@ -59,6 +59,8 @@ class DesignModel(BaseModel):
files: List[FileModel] files: List[FileModel]
commands: List[str] = Field(default_factory=list) commands: List[str] = Field(default_factory=list)
check_tools: List[str] = Field(default_factory=list) check_tools: List[str] = Field(default_factory=list)
readme_path: Optional[str] = Field(default=None, description="README文件的路径")
readme_hash: Optional[str] = Field(default=None, description="README文件的SHA256哈希值")
# 模型用于工单 # 模型用于工单
@ -92,10 +94,11 @@ class StateModel(BaseModel):
readme_path: str readme_path: str
# 可选:通用响应模型,用于 LLM 调用 # 通用响应模型,用于 LLM 调用
class LLMResponse(BaseModel): class LLMResponse(BaseModel):
"""LLM 响应模型,用于解析 generate_file 方法的返回。""" """LLM 响应模型,用于解析 generate_file 方法的返回。"""
code: str code: str
description: str description: str
commands: List[str] = Field(default_factory=list) commands: List[str] = Field(default_factory=list)
output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full''diff',默认为 'full'") output_format: OutputFormat = Field(default=OutputFormat.FULL, description="输出格式,可选值为 'full''diff',默认为 'full'")
reasoning_content: Optional[str] = Field(default=None, description="生成代码时的推理内容")