Revert "refactor(项目结构): 重构代码结构,拆分核心模块为独立生成器以提高可维护性"

This reverts commit 5d541fd3b8.
This commit is contained in:
songsenand 2026-03-20 21:14:56 +08:00
parent 5d541fd3b8
commit 0a88ff9d6e
12 changed files with 222 additions and 1652 deletions

View File

@ -199,6 +199,7 @@ llm-codegen design project_readme.md -o ./my_design
5. 执行检查与修复。
## 📝 工单模板
### 需求工单 (`feature.issue`)
@ -254,24 +255,6 @@ uv pip install -e ".[dev]"
## 项目结构
### 工具代码结构
本项目LLM 代码生成工具)的代码结构已重构,核心模块被拆分为多个专门的生成器,以提高模块化和可维护性。主要模块包括:
- **src/llm_codegen/cli.py**: 命令行接口,使用 Typer 定义命令,分发到相应的生成器。
- **src/llm_codegen/core.py**: 基础生成逻辑,包含 BaseGenerator 类,提供通用方法如调用 LLM 和文件操作。
- **src/llm_codegen/init_generator.py**: 初始化命令生成器,处理 `init` 命令逻辑,继承自 BaseGenerator负责从 README.md 生成完整项目。
- **src/llm_codegen/enhance_generator.py**: 增强命令生成器,处理 `enhance` 命令逻辑,继承自 BaseGenerator负责根据需求工单增量添加功能。
- **src/llm_codegen/fix_generator.py**: 修复命令生成器,处理 `fix` 命令逻辑,继承自 BaseGenerator负责根据 Bug 工单自动修复缺陷。
- **src/llm_codegen/design_generator.py**: 设计文件生成器,处理 `design` 命令逻辑,生成中间设计文件 design.json。
- **src/llm_codegen/utils.py**: 工具函数,如危险命令判断和文件操作。
- **src/llm_codegen/models.py**: 数据模型,使用 Pydantic 定义数据结构。
- **src/llm_codegen/diff_applier.py**: 应用代码差异的工具模块(如有)。
**设计思想**: 通过将核心逻辑拆分为独立的生成器每个生成器专注于一个特定任务初始化、增强、修复、设计使得代码更易于维护和扩展。BaseGenerator 提供共享功能,减少代码重复,并确保一致性。
### 生成的项目结构
生成的项目将包含以下文件和目录:
```txt
.

View File

@ -3,7 +3,7 @@
"version": "1.0.0",
"description": "一个基于大语言模型的智能代码生成与维护工具支持自动生成、增量添加功能和自动修复Bug。",
"files": [
{
{
"path": "README.md",
"summary": "项目说明文档,包含项目概述、功能介绍和使用说明",
"dependencies": [
@ -13,6 +13,7 @@
"classes": [],
"design_updates": {}
},
{
"path": "pyproject.toml",
"summary": "项目元数据、依赖配置和脚本入口",
@ -25,7 +26,7 @@
"path": "src/llm_codegen/__init__.py",
"summary": "包初始化文件",
"dependencies": [
"src/llm_codegen/core.py"
"src/llm_codegen/core.py"
],
"functions": [],
"classes": [],
@ -303,54 +304,6 @@
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/llm_client.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/file_operations.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/command_executor.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/dependency_sorter.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/design_manager.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
},
{
"path": "src/llm_codegen/state_manager.py",
"summary": "自动生成的新文件",
"dependencies": [],
"functions": [],
"classes": [],
"design_updates": {}
}
],
"commands": [

View File

@ -31,828 +31,3 @@ acceptance_criteria:
- 日志记录、进度显示、错误提示等用户体验相关功能保持不变。
- 代码风格符合项目规范(通过 black、pylint 等检查)。
- 生成对应的单元测试。
> 原core.py内容为
```python
import json
import os
import subprocess
import sys
import concurrent.futures
import pendulum
from typing import List, Dict, Optional, Any, Tuple
from pathlib import Path
from collections import deque
import threading
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID
from loguru import logger
from openai import OpenAI
from .utils import is_dangerous_command
from .models import (
DesignModel,
StateModel,
FileModel,
FileStatus,
) # 添加 FileStatus 导入
class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
"""代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = "https://api.deepseek.com",
model: str = "deepseek-reasoner",
output_dir: str = "./generated",
log_file: Optional[str] = None,
max_concurrency: int = 4,
):
"""
初始化生成器
Args:
api_key: OpenAI API密钥默认从环境变量DEEPSEEK_APIKEY读取
base_url: API基础URL
model: 使用的模型
output_dir: 输出根目录
log_file: 日志文件路径,默认自动生成
"""
self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY")
if not self.api_key:
raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY")
self.client = OpenAI(api_key=self.api_key, base_url=base_url)
self.model = model
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.output_dir / ".llm_generator_state.json"
self.console = Console() # 添加console实例用于rich打印
self._state_lock = threading.Lock()
self.max_concurrency = max_concurrency
# 配置日志
if log_file is None:
log_file = self.output_dir / "generator.log"
logger.remove() # 移除默认handler
logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上
logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG
logger.info(f"日志已初始化,保存至: {log_file}")
self.readme_content = None
self.design: Optional[DesignModel] = None
self.state: Optional[StateModel] = None
self.progress: Optional[Progress] = None
self.tasks: Dict[str, TaskID] = {} # 任务ID映射
def _call_llm(
self,
system_prompt: str,
user_prompt: str,
temperature: float = 0.2,
expect_json: bool = True,
) -> Dict[str, Any]:
"""
调用LLM并返回解析后的JSON
"""
logger.debug(f"调用LLM模型: {self.model}")
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=temperature,
response_format={"type": "json_object"} if expect_json else None,
)
message = response.choices[0].message
content = message.content
# 记录思考过程(如果存在)
reasoning_content = None
if hasattr(message, "reasoning_content") and message.reasoning_content:
reasoning_content = message.reasoning_content
logger.info("模型思考过程已记录")
# 创建响应目录
responses_dir = self.output_dir / "llm_responses"
responses_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名(使用当前时间)
timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS")
response_file = responses_dir / f"response_{timestamp}.json"
# 保存响应到JSON文件
response_data = {
"timestamp": timestamp,
"model": self.model,
"content": content,
"reasoning_content": reasoning_content,
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"temperature": temperature,
"expect_json": expect_json,
}
with open(response_file, "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM原始响应: {response_file.name}")
if expect_json:
result = json.loads(content)
else:
result = {"content": content}
return result
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]")
raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}")
except Exception as e:
logger.error(f"LLM调用失败: {e}")
self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]")
raise
def parse_readme(self, readme_path: Path) -> str:
"""
读取README文件内容
"""
logger.info(f"读取README文件: {readme_path}")
try:
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"README内容长度: {len(content)} 字符")
return content
except Exception as e:
logger.error(f"读取README失败: {e}")
self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]")
raise
def generate_design_json(self) -> DesignModel:
"""
调用LLM生成design.json内容并解析为DesignModel
"""
system_prompt = (
"你是一个软件架构师。请根据README描述生成项目的中间设计文件design.json。"
"design.json应包含项目名称、版本、描述、文件列表含路径、摘要、依赖、函数和类、建议命令和检查工具。"
"返回严格的 JSON 对象符合DesignModel结构。"
)
user_prompt = f"README内容如下\n\n{self.readme_content}"
result = self._call_llm(system_prompt, user_prompt)
design_data = result
design = DesignModel(**design_data)
# 写入design.json文件
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"已生成design.json: {design_path}")
return design
def load_state(self) -> Optional[StateModel]:
"""加载断点续写状态"""
if self.state_file.exists():
try:
with open(self.state_file, "r", encoding="utf-8") as f:
state_data = json.load(f)
self.state = StateModel(**state_data)
logger.info(
f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个"
)
return self.state
except Exception as e:
logger.error(f"加载状态失败: {e}")
self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]")
return None
return None
def save_state(
self, generated_files: List[str], dependencies_map: Dict[str, List[str]]
) -> None:
"""保存断点续写状态,适应并发生成(线程安全)"""
with self._state_lock: # 串行化写入
state = StateModel(
current_file_index=0,
generated_files=generated_files,
dependencies_map=dependencies_map,
total_files=len(self.design.files) if self.design else 0,
output_dir=str(self.output_dir),
readme_path=self.readme_content[:100] if self.readme_content else "",
)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(state.model_dump(), f, indent=2, ensure_ascii=False)
logger.debug(f"状态已保存: {self.state_file}")
def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]:
"""
从design.json获取文件列表和依赖关系
Returns:
(files, dependencies)
files: 按顺序需要生成的文件路径列表
dependencies: 字典 {file: [依赖文件路径]}
"""
if not self.design:
raise ValueError("design.json未加载请先调用generate_design_json")
files = [file.path for file in self.design.files]
dependencies = {file.path: file.dependencies for file in self.design.files}
logger.info(f"从design.json解析到 {len(files)} 个待生成文件")
logger.debug(f"文件列表: {files}")
logger.debug(f"依赖关系: {dependencies}")
return files, dependencies
def _add_implicit_dependencies(
self, files: List[str], dependencies: Dict[str, List[str]]
) -> Dict[str, List[str]]:
"""
添加隐式依赖关系,基于文件路径和常见模式
Args:
files: 文件路径列表
dependencies: 原始依赖字典
Returns:
Dict[str, List[str]]: 增强后的依赖字典
"""
enhanced = dependencies.copy()
for file in files:
if file not in enhanced:
enhanced[file] = []
# 添加同一目录下的其他文件作为隐式依赖(简单示例)
path = Path(file)
implicit_deps = [
f
for f in files
if f != file
and Path(f).parent == path.parent
and f not in enhanced[file]
]
if implicit_deps:
enhanced[file].extend(implicit_deps)
logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}")
return enhanced
def generate_file(
self,
file_path: str,
prompt_instruction: str,
dependency_files: List[str],
existing_content: Optional[str] = None,
output_format: str = "full", # 新增参数,默认 'full'
) -> Tuple[str, str, List[str]]:
"""
生成单个文件,返回 (代码, 描述, 命令列表)
Args:
file_path: 目标文件路径
prompt_instruction: 生成指令
dependency_files: 依赖文件列表(用于上下文)
existing_content: 文件现有内容(若为修改模式)
output_format: 输出格式,'full',来自 models.py
"""
# 收集上下文内容
context_content = []
if self.readme_content:
context_content.append(f"### 项目 README ###\n{self.readme_content}\n")
# 添加 design.json 上下文
design_path = self.output_dir / "design.json"
if design_path.exists():
try:
with open(design_path, "r", encoding="utf-8") as f:
design_content = f.read()
context_content.append(
f"### 设计文件: design.json ###\n{design_content}\n"
)
except Exception as e:
logger.error(f"读取design.json失败: {e}")
self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]")
# 如果design.json读取失败可能无法继续但保持上下文为空或部分
# 添加依赖文件内容(仅读取存在的文件)
for dep in dependency_files:
dep_path = Path(dep)
if not dep_path.exists():
alt_path = self.output_dir / dep
if alt_path.exists():
dep_path = alt_path
else:
logger.warning(f"依赖文件不存在,已跳过: {dep}")
self.console.print(
f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]"
)
continue
try:
with open(dep_path, "r", encoding="utf-8") as f:
content = f.read()
context_content.append(
f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n"
)
except Exception as e:
logger.error(f"读取依赖文件 {dep} 失败: {e}")
self.console.print(
f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]"
)
# 跳过此依赖文件
# 如果有现有内容,也加入上下文
if existing_content is not None:
context_content.append(
f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n"
)
full_context = "\n".join(context_content)
# output_format 为 'full' 或其他,保持现有逻辑
if existing_content is not None:
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- code: (string) 修改后的完整代码\n"
"- description: (string) 简短的中文修改描述\n"
"- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'full'"
)
else:
system_prompt = (
"你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。"
"返回严格的 JSON 对象,包含四个字段:\n"
"- code: (string) 生成的完整代码\n"
"- description: (string) 简短的中文功能描述\n"
"- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n"
"- output_format: (string) 应为 'full'"
)
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
try:
result = self._call_llm(system_prompt, user_prompt)
code = result.get("code")
description = result.get("description", "")
commands = result.get("commands", [])
result.get("output_format", "full")
if code is None:
raise ValueError("LLM 响应中没有 code 字段")
return code, description, commands
except Exception as e:
logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}")
self.console.print(
f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]"
)
# 返回默认值以便继续
return "# 生成失败,请检查日志", "生成失败,发生错误", []
def _generate_file_task(
self, file_path: str, dependencies: List[str], generated_files: set
) -> Tuple[bool, str]:
"""
并发任务函数,用于生成单个文件
Args:
file_path: 文件路径
dependencies: 依赖文件列表
generated_files: 已生成文件的集合(用于上下文)
Returns:
Tuple[bool, str]: (是否成功, 错误信息或空字符串)
"""
try:
instruction = (
f"请根据README描述和依赖文件生成文件 '{file_path}' 的完整代码。"
)
# 过滤依赖文件,只使用已生成的
available_deps = [dep for dep in dependencies if dep in generated_files]
code, desc, commands = self.generate_file(
file_path, instruction, available_deps
)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {output_path}")
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
success = self.execute_command(cmd, cwd=self.output_dir)
if not success:
logger.warning(f"命令执行失败,但继续处理: {cmd}")
return True, ""
except Exception as e:
logger.error(f"生成文件 {file_path} 失败: {e}")
return False, str(e)
def _topological_sort(
self, files: List[str], dependencies: Dict[str, List[str]]
) -> List[str]:
"""
对文件列表进行拓扑排序,基于依赖关系。
返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。
如果检测到循环依赖抛出ValueError。
"""
from collections import deque
# 初始化入度和反向邻接表
in_degree = {f: 0 for f in files}
rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f
# 构建图如果文件f依赖于dep则增加f的入度并将f加入rev_graph[dep]
for f in files:
for dep in dependencies.get(f, []):
if dep in files: # 只考虑在files中的依赖
in_degree[f] += 1 # f依赖于dep所以f的入度增加
rev_graph[dep].append(f) # dep被f依赖
# 队列初始化为入度为0的文件无依赖的文件
queue = deque([f for f in files if in_degree[f] == 0])
sorted_files = []
while queue:
node = queue.popleft()
sorted_files.append(node)
# 所有依赖于node的文件入度减1
for dependent in rev_graph[node]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# 检查是否所有文件都已排序(无循环依赖)
if len(sorted_files) != len(files):
raise ValueError(
f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。"
)
return sorted_files
def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool:
"""
执行单个命令,检查风险,失败仅记录错误不抛出异常
Returns:
bool: 命令是否成功执行
"""
dangerous, reason = is_dangerous_command(cmd)
if dangerous:
logger.error(f"危险命令被阻止: {cmd},原因: {reason}")
self.console.print(
f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]"
)
return False
logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd or self.output_dir,
capture_output=True,
text=True,
timeout=300, # 5分钟超时
)
logger.debug(f"命令返回码: {result.returncode}")
if result.stdout:
logger.debug(f"stdout: {result.stdout[:500]}")
if result.stderr:
logger.warning(f"stderr: {result.stderr[:500]}")
if result.returncode != 0:
logger.error(f"命令执行失败,返回码: {result.returncode}")
self.console.print(
f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]"
)
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"命令执行超时: {cmd}")
self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]")
return False
except Exception as e:
logger.error(f"命令执行失败: {e}")
self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]")
return False
def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]:
"""
调用 LLM 分析工单,返回结构化变更计划
"""
system_prompt = (
"你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件design.json"
"分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n"
"- affected_files: 数组,每个元素为一个对象,包含:\n"
" - path: 文件路径(相对于项目根目录)\n"
" - action: 'create' 或 'modify'\n"
" - description: 对此文件变更的简短描述\n"
" - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n"
"- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n"
"注意:仅返回 JSON不要包含其他文本。"
)
# 将现有 design.json 内容作为上下文的一部分
if not self.design:
design_path = self.output_dir / "design.json"
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
raise e
design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False)
user_prompt = (
f"工单类型: {issue_type}\n"
f"工单内容:\n{issue_content}\n\n"
f"现有设计文件 (design.json):\n{design_str}"
)
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
return result
def _update_design(
self, generated_files: List[str], design_updates: Dict[str, Any]
):
"""
根据生成的变更更新 design.json
使用 FileModel 来处理文件信息
"""
updated = False
# 处理新增文件
for file_path in generated_files:
# 检查文件是否已在 design.files 中
exists = any(f.path == file_path for f in self.design.files)
if not exists:
# 获取更新信息
update_info = design_updates.get(file_path, {})
# 创建新文件条目FileModel实例
new_file = FileModel(
path=file_path,
summary=update_info.get("summary", "自动生成的新文件"),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}),
)
self.design.files.append(new_file)
updated = True
logger.info(f"已将新文件 {file_path} 添加到 design.json")
# 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要)
# 这里可根据实际需求扩展,当前仅处理新增文件
if updated:
# 保存更新后的 design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info("design.json 已更新")
def refresh_design(self) -> bool:
"""
重新生成design.json基于当前README内容或加载的design.json
返回bool表示是否成功
"""
logger.info("开始刷新design.json")
if not self.readme_content:
# 尝试读取README.md文件
readme_path = self.output_dir / "README.md"
if readme_path.exists():
try:
self.readme_content = self.parse_readme(readme_path)
except Exception as e:
logger.error(f"读取README.md失败无法刷新design: {e}")
self.console.print(
f"[bold red]❌ 读取README.md失败无法刷新design: {e}[/bold red]"
)
return False
else:
logger.error("没有README内容且README.md文件不存在无法刷新design")
self.console.print(
"[bold red]❌ 没有README内容且README.md文件不存在无法刷新design[/bold red]"
)
return False
try:
self.design = self.generate_design_json()
logger.info("design.json已成功重新生成")
self.console.print("[green]✅ design.json已重新生成[/green]")
return True
except Exception as e:
logger.error(f"重新生成design.json失败: {e}")
self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]")
return False
def update_file_entry(self, file_path: str, file_content: str) -> bool:
"""
更新design.json中单个文件的条目基于提供的文件内容
返回bool表示是否成功
"""
logger.info(f"开始更新design.json中文件条目: {file_path}")
if not self.design:
# 加载现有design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 调用LLM分析文件内容返回更新信息增强以支持design_updates字段
system_prompt = (
"你是一个软件架构师。分析给定的文件内容并返回对design.json中该文件条目的更新。"
"返回严格的JSON对象包含以下字段\n"
"- summary: 文件的新摘要\n"
"- dependencies: 依赖文件列表\n"
"- functions: 函数列表每个对象有name, summary, inputs, outputs\n"
"- classes: 类列表每个对象有name, summary, methods\n"
"- design_updates: 可选,设计更新字典\n"
"注意仅返回JSON不要其他文本。"
)
# 准备当前design.json中该文件的条目信息
current_entry = None
for f in self.design.files:
if f.path == file_path:
current_entry = f.model_dump()
break
user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目如果存在:\n{json.dumps(current_entry, indent=2) if current_entry else '无'}"
try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
update_info = result
# 查找或创建文件条目
file_model = None
for f in self.design.files:
if f.path == file_path:
file_model = f
break
if file_model is None:
# 创建新条目包括design_updates
new_file = FileModel(
path=file_path,
summary=update_info.get("summary", ""),
dependencies=update_info.get("dependencies", []),
functions=update_info.get("functions", []),
classes=update_info.get("classes", []),
design_updates=update_info.get("design_updates", {}), # 新增design_updates处理
)
self.design.files.append(new_file)
logger.info(f"在design.json中创建了新文件条目: {file_path}")
else:
# 更新现有条目使用merge_design_updates处理design_updates
if 'design_updates' in update_info:
file_model.merge_design_updates(update_info['design_updates'])
# 更新其他字段
file_model.summary = update_info.get("summary", file_model.summary)
file_model.dependencies = update_info.get(
"dependencies", file_model.dependencies
)
file_model.functions = update_info.get(
"functions", file_model.functions
)
file_model.classes = update_info.get("classes", file_model.classes)
logger.info(f"更新了design.json中的文件条目: {file_path}")
# 保存更新后的design.json
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_path}")
self.console.print(
f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]"
)
return True
except Exception as e:
logger.error(f"更新文件条目失败: {e}")
self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]")
return False
def sync_readme(self) -> bool:
"""
同步README.md和design.json确保内容一致性
返回bool表示是否成功
"""
logger.info("开始同步README.md和design.json")
# 读取README.md
readme_path = self.output_dir / "README.md"
if not readme_path.exists():
logger.error(f"README.md不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]"
)
return False
try:
with open(readme_path, "r", encoding="utf-8") as f:
readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
return False
# 加载design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
return False
# 调用LLM比较和同步
system_prompt = (
"你是一个软件架构师。比较README.md内容和design.json识别不一致之处并建议更新。"
"返回严格的JSON对象包含以下字段\n"
"- needs_update: bool, 是否需要更新\n"
"- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n"
"- updates: 对象,描述具体的更新内容\n"
"注意仅返回JSON不要其他文本。"
)
user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}"
try:
result = self._call_llm(system_prompt, user_prompt, temperature=0.2)
needs_update = result.get("needs_update", False)
if not needs_update:
logger.info("README.md和design.json已同步无需更新")
self.console.print(
"[green]✅ README.md和design.json已同步无需更新[/green]"
)
return True
update_type = result.get("update_type", "")
updates = result.get("updates", {})
if update_type == "readme":
# 更新README.md
new_readme = updates.get("new_readme", readme_content)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
logger.info("已更新README.md")
self.console.print("[green]✅ README.md已更新[/green]")
elif update_type == "design":
# 更新design.json
new_design_data = updates.get("new_design", design.model_dump())
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已更新design.json")
self.console.print("[green]✅ design.json已更新[/green]")
elif update_type == "both":
# 更新两者
new_readme = updates.get("new_readme", readme_content)
new_design_data = updates.get("new_design", design.model_dump())
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
design = DesignModel(**new_design_data)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已同步更新README.md和design.json")
self.console.print("[green]✅ README.md和design.json已同步更新[/green]")
else:
logger.warning(f"未知的update_type: {update_type}")
self.console.print(
f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]"
)
return False
return True
except Exception as e:
logger.error(f"同步README.md失败: {e}")
self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]")
return False
```

View File

@ -13,6 +13,7 @@ from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
from loguru import logger
from .core import BaseGenerator
from .init_generator import InitGenerator
from .enhance_generator import EnhanceGenerator
from .fix_generator import FixGenerator

View File

@ -1,51 +0,0 @@
import subprocess
import os
from typing import Optional, Tuple
from loguru import logger
from .utils import is_dangerous_command
class CommandExecutor:
"""
命令执行器负责执行系统命令并集成危险命令拦截
"""
def __init__(self):
pass
def execute(self, cmd: str, cwd: Optional[str] = None) -> Tuple[bool, str]:
"""
执行系统命令在执行前检查是否危险
Args:
cmd: 要执行的命令字符串
cwd: 工作目录路径如果为 None 则使用当前目录
Returns:
Tuple[bool, str]: (执行是否成功, 输出或错误消息)
"""
# 检查命令是否危险
is_dangerous, reason = is_dangerous_command(cmd)
if is_dangerous:
logger.warning(f"危险命令被拦截: {cmd}, 原因: {reason}")
return False, f"命令危险被拦截: {reason}"
try:
# 执行命令
result = subprocess.run(
cmd,
shell=True, # 使用 shell 执行命令字符串
cwd=cwd,
capture_output=True,
text=True,
encoding='utf-8'
)
if result.returncode == 0:
logger.info(f"命令执行成功: {cmd}")
return True, result.stdout
else:
logger.error(f"命令执行失败: {cmd}, 错误: {result.stderr}")
return False, result.stderr
except Exception as e:
logger.error(f"执行命令时发生异常: {cmd}, 异常: {e}")
return False, str(e)

View File

@ -14,25 +14,17 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID
from loguru import logger
from openai import OpenAI
from .utils import is_dangerous_command, read_file as utils_read_file, write_file as utils_write_file, ensure_dir, safe_join, log_error, is_fatal_error, build_dependency_graph, compute_in_degrees, topological_sort as utils_topological_sort, create_progress_bar
from .utils import is_dangerous_command
from .models import (
DesignModel,
StateModel,
FileModel,
FileStatus,
LLMResponse,
OutputFormat
)
from .llm_client import LLMClient
from .file_operations import handle_llm_response, generate_diff, apply_diff
from .command_executor import CommandExecutor
from .dependency_sorter import topological_sort as dependency_topological_sort, detect_cycles
from .design_manager import DesignManager
from .state_manager import StateManager
) # 添加 FileStatus 导入
class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
"""代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行,使用组合模式集成模块"""
"""代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行"""
def __init__(
self,
@ -57,11 +49,14 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
if not self.api_key:
raise ValueError("必须提供API密钥或设置环境变量DEEPSEEK_APIKEY")
self.client = OpenAI(api_key=self.api_key, base_url=base_url)
self.model = model
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.output_dir / ".llm_generator_state.json"
self.console = Console() # 添加console实例用于rich打印
self._state_lock = threading.Lock()
self.max_concurrency = max_concurrency
# 配置日志
@ -72,18 +67,6 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG
logger.info(f"日志已初始化,保存至: {log_file}")
# 初始化模块实例(组合模式)
self.llm_client = LLMClient(
api_key=self.api_key,
model=model,
base_url=base_url if base_url != "https://api.deepseek.com" else None, # LLMClient 处理默认URL
log_level="INFO"
)
self.command_executor = CommandExecutor()
self.design_manager = DesignManager(design_file_path=str(self.output_dir / "design.json"))
self.state_manager = StateManager(state_file_path=str(self.state_file))
# file_operations 和 dependency_sorter 作为函数模块直接使用
self.readme_content = None
self.design: Optional[DesignModel] = None
self.state: Optional[StateModel] = None
@ -98,24 +81,66 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
expect_json: bool = True,
) -> Dict[str, Any]:
"""
调用LLM并返回解析后的JSON使用 LLMClient 模块
调用LLM并返回解析后的JSON
"""
logger.debug(f"调用LLM模型: {self.llm_client.model}")
logger.debug(f"调用LLM模型: {self.model}")
try:
llm_response = self.llm_client.call(
system_prompt=system_prompt,
user_prompt=user_prompt,
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
temperature=temperature,
expect_json=expect_json
response_format={"type": "json_object"} if expect_json else None,
)
# 转换为字典以保持接口兼容
result = {
"code": llm_response.code,
"description": llm_response.description,
"commands": llm_response.commands,
"output_format": llm_response.output_format.value
message = response.choices[0].message
content = message.content
# 记录思考过程(如果存在)
reasoning_content = None
if hasattr(message, "reasoning_content") and message.reasoning_content:
reasoning_content = message.reasoning_content
logger.info("模型思考过程已记录")
# 创建响应目录
responses_dir = self.output_dir / "llm_responses"
responses_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名(使用当前时间)
timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS")
response_file = responses_dir / f"response_{timestamp}.json"
# 保存响应到JSON文件
response_data = {
"timestamp": timestamp,
"model": self.model,
"content": content,
"reasoning_content": reasoning_content,
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"temperature": temperature,
"expect_json": expect_json,
}
with open(response_file, "w", encoding="utf-8") as f:
json.dump(response_data, f, indent=2, ensure_ascii=False)
logger.debug(f"LLM原始响应: {response_file.name}")
if expect_json:
result = json.loads(content)
else:
result = {"content": content}
return result
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]")
raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}")
except Exception as e:
logger.error(f"LLM调用失败: {e}")
self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]")
@ -123,11 +148,12 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
def parse_readme(self, readme_path: Path) -> str:
"""
读取README文件内容使用 file_operations 模块
读取README文件内容
"""
logger.info(f"读取README文件: {readme_path}")
try:
content = utils_read_file(str(readme_path))
with open(readme_path, "r", encoding="utf-8") as f:
content = f.read()
logger.debug(f"README内容长度: {len(content)} 字符")
return content
except Exception as e:
@ -150,30 +176,35 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
design_data = result
design = DesignModel(**design_data)
# 保存design.json文件
self.design_manager.save_design(design)
logger.info(f"已生成design.json: {self.design_manager.design_file_path}")
# 写入design.json文件
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"已生成design.json: {design_path}")
return design
def load_state(self) -> Optional[StateModel]:
"""加载断点续写状态,使用 StateManager 模块"""
try:
self.state = self.state_manager.read_state()
if self.state:
"""加载断点续写状态"""
if self.state_file.exists():
try:
with open(self.state_file, "r", encoding="utf-8") as f:
state_data = json.load(f)
self.state = StateModel(**state_data)
logger.info(
f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)}"
)
return self.state
except Exception as e:
logger.error(f"加载状态失败: {e}")
self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]")
return None
return self.state
except Exception as e:
logger.error(f"加载状态失败: {e}")
self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]")
return None
return None
def save_state(
self, generated_files: List[str], dependencies_map: Dict[str, List[str]]
) -> None:
"""保存断点续写状态,适应并发生成(线程安全),使用 StateManager 模块"""
"""保存断点续写状态,适应并发生成(线程安全)"""
with self._state_lock: # 串行化写入
state = StateModel(
current_file_index=0,
@ -183,7 +214,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
output_dir=str(self.output_dir),
readme_path=self.readme_content[:100] if self.readme_content else "",
)
self.state_manager.write_state(state)
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(state.model_dump(), f, indent=2, ensure_ascii=False)
logger.debug(f"状态已保存: {self.state_file}")
def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]:
@ -196,11 +228,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
dependencies: 字典 {file: [依赖文件路径]}
"""
if not self.design:
# 尝试加载设计
try:
self.design = self.design_manager.load_design()
except Exception as e:
raise ValueError(f"design.json未加载或加载失败: {e}")
raise ValueError("design.json未加载请先调用generate_design_json")
files = [file.path for file in self.design.files]
dependencies = {file.path: file.dependencies for file in self.design.files}
@ -270,7 +298,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
design_path = self.output_dir / "design.json"
if design_path.exists():
try:
design_content = utils_read_file(str(design_path))
with open(design_path, "r", encoding="utf-8") as f:
design_content = f.read()
context_content.append(
f"### 设计文件: design.json ###\n{design_content}\n"
)
@ -294,7 +323,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
continue
try:
content = utils_read_file(str(dep_path))
with open(dep_path, "r", encoding="utf-8") as f:
content = f.read()
context_content.append(
f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n"
)
@ -336,19 +366,13 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}"
try:
llm_response = self.llm_client.call(
system_prompt=system_prompt,
user_prompt=user_prompt,
temperature=0.2,
expect_json=True
)
code = llm_response.code
description = llm_response.description
commands = llm_response.commands
# 使用 handle_llm_response 处理文件写入,支持不同输出格式
output_path = str(self.output_dir / file_path)
handle_llm_response(output_path, llm_response)
logger.info(f"文件已生成并写入: {output_path}")
result = self._call_llm(system_prompt, user_prompt)
code = result.get("code")
description = result.get("description", "")
commands = result.get("commands", [])
result.get("output_format", "full")
if code is None:
raise ValueError("LLM 响应中没有 code 字段")
return code, description, commands
except Exception as e:
logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}")
@ -383,6 +407,13 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
)
logger.info(f"生成完成: {file_path} - {desc}")
# 写入文件
output_path = self.output_dir / file_path
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(code)
logger.info(f"已写入: {output_path}")
# 执行命令
for cmd in commands:
logger.info(f"准备执行命令: {cmd}")
@ -402,16 +433,39 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
返回排序后的列表满足每个文件的依赖项都出现在该文件之前
如果检测到循环依赖抛出ValueError
"""
try:
sorted_files = dependency_topological_sort(dependencies)
# 确保所有文件都在排序列表中
if set(sorted_files) != set(files):
logger.warning("依赖图可能不完整,调整排序列表")
sorted_files = [f for f in files if f in sorted_files] + [f for f in files if f not in sorted_files]
return sorted_files
except ValueError as e:
logger.error(f"拓扑排序失败: {e}")
raise
from collections import deque
# 初始化入度和反向邻接表
in_degree = {f: 0 for f in files}
rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f
# 构建图如果文件f依赖于dep则增加f的入度并将f加入rev_graph[dep]
for f in files:
for dep in dependencies.get(f, []):
if dep in files: # 只考虑在files中的依赖
in_degree[f] += 1 # f依赖于dep所以f的入度增加
rev_graph[dep].append(f) # dep被f依赖
# 队列初始化为入度为0的文件无依赖的文件
queue = deque([f for f in files if in_degree[f] == 0])
sorted_files = []
while queue:
node = queue.popleft()
sorted_files.append(node)
# 所有依赖于node的文件入度减1
for dependent in rev_graph[node]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# 检查是否所有文件都已排序(无循环依赖)
if len(sorted_files) != len(files):
raise ValueError(
f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。"
)
return sorted_files
def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool:
"""
@ -420,11 +474,44 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
Returns:
bool: 命令是否成功执行
"""
success, output = self.command_executor.execute(cmd, cwd=str(cwd) if cwd else None)
if not success:
logger.error(f"命令执行失败: {cmd}, 输出: {output}")
self.console.print(f"[bold red]❌ 命令执行失败: {cmd}[/bold red]")
return success
dangerous, reason = is_dangerous_command(cmd)
if dangerous:
logger.error(f"危险命令被阻止: {cmd},原因: {reason}")
self.console.print(
f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]"
)
return False
logger.info(f"执行命令: {cmd}")
try:
result = subprocess.run(
cmd,
shell=True,
cwd=cwd or self.output_dir,
capture_output=True,
text=True,
timeout=300, # 5分钟超时
)
logger.debug(f"命令返回码: {result.returncode}")
if result.stdout:
logger.debug(f"stdout: {result.stdout[:500]}")
if result.stderr:
logger.warning(f"stderr: {result.stderr[:500]}")
if result.returncode != 0:
logger.error(f"命令执行失败,返回码: {result.returncode}")
self.console.print(
f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]"
)
return False
return True
except subprocess.TimeoutExpired:
logger.error(f"命令执行超时: {cmd}")
self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]")
return False
except Exception as e:
logger.error(f"命令执行失败: {e}")
self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]")
return False
def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]:
"""
@ -444,8 +531,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
# 将现有 design.json 内容作为上下文的一部分
if not self.design:
design_path = self.output_dir / "design.json"
try:
self.design = self.design_manager.load_design()
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
@ -495,7 +585,9 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
if updated:
# 保存更新后的 design.json
self.design_manager.save_design(self.design)
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info("design.json 已更新")
def refresh_design(self) -> bool:
@ -541,8 +633,17 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
logger.info(f"开始更新design.json中文件条目: {file_path}")
if not self.design:
# 加载现有design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
self.design = self.design_manager.load_design()
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
self.design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
@ -605,7 +706,9 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
logger.info(f"更新了design.json中的文件条目: {file_path}")
# 保存更新后的design.json
self.design_manager.save_design(self.design)
design_path = self.output_dir / "design.json"
with open(design_path, "w", encoding="utf-8") as f:
json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False)
logger.info(f"design.json已更新文件条目: {file_path}")
self.console.print(
f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]"
@ -631,15 +734,25 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
)
return False
try:
readme_content = utils_read_file(str(readme_path))
with open(readme_path, "r", encoding="utf-8") as f:
readme_content = f.read()
except Exception as e:
logger.error(f"读取README.md失败: {e}")
self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]")
return False
# 加载design.json
design_path = self.output_dir / "design.json"
if not design_path.exists():
logger.error(f"design.json不存在于 {self.output_dir}")
self.console.print(
f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]"
)
return False
try:
design = self.design_manager.load_design()
with open(design_path, "r", encoding="utf-8") as f:
design_data = json.load(f)
design = DesignModel(**design_data)
except Exception as e:
logger.error(f"加载design.json失败: {e}")
self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]")
@ -671,23 +784,27 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
if update_type == "readme":
# 更新README.md
new_readme = updates.get("new_readme", readme_content)
utils_write_file(str(readme_path), new_readme)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
logger.info("已更新README.md")
self.console.print("[green]✅ README.md已更新[/green]")
elif update_type == "design":
# 更新design.json
new_design_data = updates.get("new_design", design.model_dump())
design = DesignModel(**new_design_data)
self.design_manager.save_design(design)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已更新design.json")
self.console.print("[green]✅ design.json已更新[/green]")
elif update_type == "both":
# 更新两者
new_readme = updates.get("new_readme", readme_content)
new_design_data = updates.get("new_design", design.model_dump())
utils_write_file(str(readme_path), new_readme)
with open(readme_path, "w", encoding="utf-8") as f:
f.write(new_readme)
design = DesignModel(**new_design_data)
self.design_manager.save_design(design)
with open(design_path, "w", encoding="utf-8") as f:
json.dump(new_design_data, f, indent=2, ensure_ascii=False)
logger.info("已同步更新README.md和design.json")
self.console.print("[green]✅ README.md和design.json已同步更新[/green]")
else:
@ -701,18 +818,3 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件
logger.error(f"同步README.md失败: {e}")
self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]")
return False
def run(self, readme_path: Path) -> None:
"""
主执行流程控制整个生成过程占位符具体实现取决于子类或调用
保持接口不变内部逻辑可能调整
"""
# 示例读取README生成设计生成文件
self.readme_content = self.parse_readme(readme_path)
self.design = self.generate_design_json()
files, dependencies = self.get_project_structure()
# 简化:直接生成所有文件
for file_path in files:
instruction = f"生成文件 {file_path}"
self.generate_file(file_path, instruction, dependencies.get(file_path, []))
logger.info("生成完成")

View File

@ -1,114 +0,0 @@
"""
Module for dependency sorting and cycle detection.
Provides functions to perform topological sort and detect cycles in a dependency graph.
"""
from collections import defaultdict, deque
def topological_sort(dependencies):
"""
Perform topological sort on a dependency graph.
Args:
dependencies (dict): A dictionary where keys are nodes (e.g., file paths) and values are lists of dependencies.
Example: {"src/llm_codegen/core.py": ["src/llm_codegen/utils.py", "src/llm_codegen/models.py"], ...}
Returns:
list: A list of nodes in topological order.
Raises:
ValueError: If a cycle is detected in the graph, with details of the cycle.
"""
# Kahn's algorithm for topological sort
graph = defaultdict(list)
in_degree = defaultdict(int)
# Build graph and compute in-degree
for node, deps in dependencies.items():
graph[node] = deps[:] # Copy to avoid modification
for dep in deps:
in_degree[dep] += 1
# Ensure all nodes are included in in_degree
if node not in in_degree:
in_degree[node] = 0
# Initialize queue with nodes having zero in-degree
queue = deque([node for node in in_degree if in_degree[node] == 0])
sorted_nodes = []
while queue:
node = queue.popleft()
sorted_nodes.append(node)
for neighbor in graph.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Check for cycles
if len(sorted_nodes) != len(in_degree):
# Cycle detected, find and report it
cycle = _detect_cycle_dfs(dependencies)
raise ValueError(f"Cycle detected in dependency graph: {cycle}")
return sorted_nodes
def _detect_cycle_dfs(dependencies):
"""
Internal helper function to detect a cycle in a dependency graph using DFS.
Args:
dependencies (dict): Same as topological_sort.
Returns:
list: A list of nodes forming a cycle if found, else an empty list.
"""
graph = defaultdict(list)
for node, deps in dependencies.items():
graph[node] = deps[:]
visited = set()
rec_stack = set()
cycle = []
def dfs(node, path):
nonlocal cycle
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
if dfs(neighbor, path):
return True
elif neighbor in rec_stack:
# Cycle detected, extract from path
start_index = path.index(neighbor)
cycle = path[start_index:] + [neighbor]
return True
rec_stack.remove(node)
path.pop()
return False
for node in graph:
if node not in visited:
if dfs(node, []):
return cycle
return []
def detect_cycles(dependencies):
"""
Detect cycles in a dependency graph.
Args:
dependencies (dict): Same as topological_sort.
Returns:
tuple: (has_cycle, cycle_nodes), where has_cycle is a boolean, and cycle_nodes is a list if cycle found.
"""
cycle = _detect_cycle_dfs(dependencies)
if cycle:
return True, cycle
return False, []

View File

@ -1,106 +0,0 @@
import json
from pathlib import Path
from typing import Optional, Dict, Any
from .models import DesignModel, FileModel
class DesignManager:
"""管理 design.json 的加载、保存、更新和同步操作。"""
def __init__(self, design_file_path: str = "design.json") -> None:
"""
初始化 DesignManager
参数:
design_file_path: design.json 文件的路径默认为当前目录下的 design.json
"""
self.design_file_path = Path(design_file_path)
self.design: Optional[DesignModel] = None
def load_design(self) -> DesignModel:
"""
从文件加载 design.json 并解析为 DesignModel
返回:
DesignModel 实例
异常:
FileNotFoundError: 如果文件不存在
ValueError: 如果 JSON 解析失败或模型验证失败
"""
if not self.design_file_path.exists():
raise FileNotFoundError(f"Design file not found: {self.design_file_path}")
with open(self.design_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.design = DesignModel(**data)
return self.design
def save_design(self, design: Optional[DesignModel] = None) -> None:
"""
DesignModel 保存到 design.json 文件
参数:
design: 要保存的 DesignModel 实例如果为 None则使用 self.design
异常:
ValueError: 如果 design None self.design 也为 None
"""
if design is None:
design = self.design
if design is None:
raise ValueError("No design data to save.")
with open(self.design_file_path, 'w', encoding='utf-8') as f:
json.dump(design.dict(), f, ensure_ascii=False, indent=2)
def update_design(self, updates: Dict[str, Any]) -> None:
"""
更新当前设计数据
参数:
updates: 一个字典包含要更新的字段和值符合 DesignModel 结构
例如{"project_name": "new_name", "files": [...]}
注意:
此方法直接修改 self.design如果 self.design None则先加载
"""
if self.design is None:
self.load_design()
# 合并更新到现有设计数据
current_data = self.design.dict()
current_data.update(updates)
self.design = DesignModel(**current_data)
def sync_design(self) -> None:
"""
同步设计数据合并所有文件的 design_updates 到主设计
"""
if self.design is None:
self.load_design()
for file_model in self.design.files:
if file_model.design_updates:
# 使用文件模型的 merge_design_updates 方法合并更新
file_model.merge_design_updates(file_model.design_updates)
# 清空 design_updates表示已同步
file_model.design_updates = {}
# 保存同步后的设计
self.save_design()
# 便捷函数
def load_design(file_path: str) -> DesignModel:
"""便捷函数,加载设计文件。"""
manager = DesignManager(file_path)
return manager.load_design()
def save_design(design: DesignModel, file_path: str) -> None:
"""便捷函数,保存设计文件。"""
manager = DesignManager(file_path)
manager.save_design(design)

View File

@ -1,148 +0,0 @@
import os
import pathlib
import difflib
from typing import List, Optional
from .models import LLMResponse, OutputFormat
def read_file(file_path: str) -> str:
"""
读取文件内容并返回字符串
参数:
file_path: 文件路径字符串
返回:
文件内容的字符串
异常:
FileNotFoundError: 如果文件不存在
UnicodeDecodeError: 如果编码问题
"""
path = pathlib.Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
return path.read_text(encoding='utf-8')
def write_file(file_path: str, content: str) -> None:
"""
写入内容到文件如果目录不存在则创建
参数:
file_path: 文件路径字符串
content: 要写入的内容字符串
"""
path = pathlib.Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding='utf-8')
def ensure_directory_exists(dir_path: str) -> None:
"""
确保目录存在如果不存在则创建
参数:
dir_path: 目录路径字符串
"""
path = pathlib.Path(dir_path)
path.mkdir(parents=True, exist_ok=True)
def generate_diff(old_content: str, new_content: str) -> str:
"""
生成 unified diff 字符串比较旧内容和新内容
参数:
old_content: 旧内容字符串
new_content: 新内容字符串
返回:
unified diff 格式的字符串
"""
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(old_lines, new_lines, fromfile='old', tofile='new')
return ''.join(diff)
def apply_diff(file_path: str, diff_content: str) -> None:
"""
应用 unified diff 到文件假设 diff_content 是有效的 diff 字符串
这是一个简化实现可能不处理所有 diff 情况
参数:
file_path: 要应用差异的文件路径字符串
diff_content: unified diff 格式的字符串
异常:
FileNotFoundError: 如果文件不存在
ValueError: 如果 diff 无法应用
"""
# 读取当前文件内容
current_content = read_file(file_path)
current_lines = current_content.splitlines(keepends=True)
diff_lines = diff_content.splitlines(keepends=True)
# 解析 diff 并生成新内容
new_lines = []
i = 0
j = 0 # 索引用于 current_lines
while i < len(diff_lines):
line = diff_lines[i]
if line.startswith('---') or line.startswith('+++'):
i += 1
continue
elif line.startswith('@@'):
# 解析范围信息,简化跳过
i += 1
continue
elif line.startswith(' '):
# 上下文行,应该匹配当前内容
if j < len(current_lines) and current_lines[j] == line[1:]:
new_lines.append(current_lines[j])
j += 1
i += 1
else:
raise ValueError(f"Patch does not apply at line {i}: context mismatch")
elif line.startswith('-'):
# 删除行,跳过当前内容中的对应行
if j < len(current_lines) and current_lines[j] == line[1:]:
j += 1
else:
raise ValueError(f"Patch does not apply at line {i}: deletion mismatch")
i += 1
elif line.startswith('+'):
# 添加行
new_lines.append(line[1:])
i += 1
else:
i += 1
# 添加剩余当前内容(如果有)
while j < len(current_lines):
new_lines.append(current_lines[j])
j += 1
new_content = ''.join(new_lines)
write_file(file_path, new_content)
def handle_llm_response(file_path: str, response: LLMResponse) -> None:
"""
处理 LLM 响应根据 output_format 写入完整代码或应用差异
参数:
file_path: 目标文件路径字符串
response: LLMResponse 实例包含代码描述命令和输出格式
异常:
ValueError: 如果不支持的 output_format
"""
if response.output_format == OutputFormat.FULL:
write_file(file_path, response.code)
elif response.output_format == OutputFormat.DIFF:
apply_diff(file_path, response.code)
else:
raise ValueError(f"Unsupported output format: {response.output_format}")

View File

@ -1,12 +1,12 @@
import json
from pathlib import Path
from typing import Optional
from .core import CodeGenerator
from .core import BaseGenerator
from loguru import logger # 确保日志可用
class InitGenerator(CodeGenerator):
"""处理 init 命令的生成器类,继承自 CodeGenerator用于从 README 初始化项目。"""
class InitGenerator(BaseGenerator):
"""处理 init 命令的生成器类,继承自 BaseGenerator用于从 README 初始化项目。"""
def __init__(
self,

View File

@ -1,147 +0,0 @@
import json
import logging
from datetime import datetime
from typing import Optional
import openai
from pydantic import BaseModel
from .models import LLMResponse
class LLMClient:
"""LLM客户端类封装API调用、响应保存和思考过程记录。"""
def __init__(
self,
api_key: str,
model: str = "gpt-3.5-turbo",
base_url: Optional[str] = None,
log_level: str = "INFO",
):
"""
初始化LLM客户端
参数:
api_key: LLM API密钥
model: 使用的模型默认为"gpt-3.5-turbo"
base_url: API基础URL可选用于自定义端点
log_level: 日志级别默认为"INFO"
"""
self.api_key = api_key
self.model = model
self.base_url = base_url
# 初始化OpenAI客户端
if self.base_url:
self.client = openai.OpenAI(api_key=api_key, base_url=base_url)
else:
self.client = openai.OpenAI(api_key=api_key)
# 配置日志
self.logger = logging.getLogger(__name__)
self.logger.setLevel(getattr(logging, log_level.upper()))
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def call(
self,
system_prompt: str,
user_prompt: str,
temperature: float = 0.7,
expect_json: bool = True,
) -> LLMResponse:
"""
调用LLM并返回解析后的响应
参数:
system_prompt: 系统提示
user_prompt: 用户提示
temperature: 温度参数控制随机性
expect_json: 是否期望JSON响应默认为True
返回:
LLMResponse对象
异常:
抛出任何调用或解析错误
"""
# 记录思考过程开始
self.logger.info("Starting LLM call.")
self.logger.debug(f"System prompt (first 100 chars): {system_prompt[:100]}")
self.logger.debug(f"User prompt (first 100 chars): {user_prompt[:100]}")
self.logger.debug(f"Temperature: {temperature}, Expect JSON: {expect_json}")
try:
# 构建请求消息
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# 调用LLM API
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature,
response_format={"type": "json_object"} if expect_json else None,
)
content = response.choices[0].message.content
self.logger.info("LLM call successful.")
# 解析响应
if expect_json:
data = json.loads(content)
llm_response = LLMResponse(**data)
else:
# 如果不是JSON创建默认LLMResponse
llm_response = LLMResponse(
code=content,
description="Generated from non-JSON response",
commands=[],
output_format="full",
)
# 记录思考过程结束
self.logger.info(f"Response parsed: {llm_response}")
# 自动保存响应
self.save_response(llm_response, system_prompt, user_prompt)
return llm_response
except Exception as e:
self.logger.error(f"LLM call failed: {e}")
raise
def save_response(
self,
response: LLMResponse,
system_prompt: str,
user_prompt: str,
file_path: Optional[str] = None,
):
"""
保存LLM响应到文件
参数:
response: LLMResponse对象
system_prompt: 系统提示
user_prompt: 用户提示
file_path: 文件路径可选如果未提供使用默认路径
"""
if file_path is None:
# 默认保存到logs目录
import os
os.makedirs("logs", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = f"logs/llm_response_{timestamp}.json"
data = {
"timestamp": datetime.now().isoformat(),
"system_prompt": system_prompt,
"user_prompt": user_prompt,
"response": response.dict(),
}
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Response saved to {file_path}")

View File

@ -1,78 +0,0 @@
'''
状态管理器用于管理断点续写状态文件的读写线程安全
'''
import json
from pathlib import Path
from threading import Lock
from typing import Optional
from .models import StateModel
class StateManager:
'''状态管理器类,提供线程安全的状态文件读写操作。'''
def __init__(self, state_file_path: str):
'''
初始化状态管理器
参数:
state_file_path: 状态文件的路径
'''
self.state_file_path = Path(state_file_path)
self.lock = Lock()
def read_state(self) -> Optional[StateModel]:
'''
从文件读取状态
返回:
StateModel 实例如果文件不存在则返回 None
'''
with self.lock:
if not self.state_file_path.exists():
return None
try:
with open(self.state_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return StateModel(**data)
except (json.JSONDecodeError, IOError) as e:
raise RuntimeError(f"Failed to read state file: {e}") from e
def write_state(self, state: StateModel) -> None:
'''
将状态写入文件
参数:
state: StateModel 实例
'''
with self.lock:
try:
with open(self.state_file_path, 'w', encoding='utf-8') as f:
json.dump(state.dict(), f, indent=2, ensure_ascii=False)
except IOError as e:
raise RuntimeError(f"Failed to write state file: {e}") from e
def initialize_state(self, total_files: int, output_dir: str, readme_path: str) -> StateModel:
'''
初始化状态并保存到文件
参数:
total_files: 总文件数
output_dir: 输出目录
readme_path: README 文件路径
返回:
初始化的 StateModel 实例
'''
state = StateModel(
current_file_index=0,
generated_files=[],
dependencies_map={},
file_statuses={},
total_files=total_files,
output_dir=output_dir,
readme_path=readme_path
)
self.write_state(state)
return state