diff --git a/README.md b/README.md index 55adee8..778c4c1 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,7 @@ llm-codegen design project_readme.md -o ./my_design 5. 执行检查与修复。 + ## 📝 工单模板 ### 需求工单 (`feature.issue`) @@ -254,24 +255,6 @@ uv pip install -e ".[dev]" ## 项目结构 -### 工具代码结构 - -本项目(LLM 代码生成工具)的代码结构已重构,核心模块被拆分为多个专门的生成器,以提高模块化和可维护性。主要模块包括: - -- **src/llm_codegen/cli.py**: 命令行接口,使用 Typer 定义命令,分发到相应的生成器。 -- **src/llm_codegen/core.py**: 基础生成逻辑,包含 BaseGenerator 类,提供通用方法如调用 LLM 和文件操作。 -- **src/llm_codegen/init_generator.py**: 初始化命令生成器,处理 `init` 命令逻辑,继承自 BaseGenerator,负责从 README.md 生成完整项目。 -- **src/llm_codegen/enhance_generator.py**: 增强命令生成器,处理 `enhance` 命令逻辑,继承自 BaseGenerator,负责根据需求工单增量添加功能。 -- **src/llm_codegen/fix_generator.py**: 修复命令生成器,处理 `fix` 命令逻辑,继承自 BaseGenerator,负责根据 Bug 工单自动修复缺陷。 -- **src/llm_codegen/design_generator.py**: 设计文件生成器,处理 `design` 命令逻辑,生成中间设计文件 design.json。 -- **src/llm_codegen/utils.py**: 工具函数,如危险命令判断和文件操作。 -- **src/llm_codegen/models.py**: 数据模型,使用 Pydantic 定义数据结构。 -- **src/llm_codegen/diff_applier.py**: 应用代码差异的工具模块(如有)。 - -**设计思想**: 通过将核心逻辑拆分为独立的生成器,每个生成器专注于一个特定任务(初始化、增强、修复、设计),使得代码更易于维护和扩展。BaseGenerator 提供共享功能,减少代码重复,并确保一致性。 - -### 生成的项目结构 - 生成的项目将包含以下文件和目录: ```txt . diff --git a/design.json b/design.json index 2213c8b..01b5600 100644 --- a/design.json +++ b/design.json @@ -3,7 +3,7 @@ "version": "1.0.0", "description": "一个基于大语言模型的智能代码生成与维护工具,支持自动生成、增量添加功能和自动修复Bug。", "files": [ - { + { "path": "README.md", "summary": "项目说明文档,包含项目概述、功能介绍和使用说明", "dependencies": [ @@ -13,6 +13,7 @@ "classes": [], "design_updates": {} }, + { "path": "pyproject.toml", "summary": "项目元数据、依赖配置和脚本入口", @@ -25,7 +26,7 @@ "path": "src/llm_codegen/__init__.py", "summary": "包初始化文件", "dependencies": [ - "src/llm_codegen/core.py" + "src/llm_codegen/core.py" ], "functions": [], "classes": [], @@ -303,54 +304,6 @@ "functions": [], "classes": [], "design_updates": {} - }, - { - "path": "src/llm_codegen/llm_client.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} - }, - { - "path": "src/llm_codegen/file_operations.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} - }, - { - "path": "src/llm_codegen/command_executor.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} - }, - { - "path": "src/llm_codegen/dependency_sorter.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} - }, - { - "path": "src/llm_codegen/design_manager.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} - }, - { - "path": "src/llm_codegen/state_manager.py", - "summary": "自动生成的新文件", - "dependencies": [], - "functions": [], - "classes": [], - "design_updates": {} } ], "commands": [ diff --git a/issues/refactor-split-core.issue b/issues/refactor-split-core.issue index fa3bbbf..d816bc5 100644 --- a/issues/refactor-split-core.issue +++ b/issues/refactor-split-core.issue @@ -30,829 +30,4 @@ acceptance_criteria: - README.md 中的“项目结构”部分更新为新文件列表,并简要说明各模块职责。 - 日志记录、进度显示、错误提示等用户体验相关功能保持不变。 - 代码风格符合项目规范(通过 black、pylint 等检查)。 - - 生成对应的单元测试。 - - - > 原core.py内容为: - ```python - import json -import os -import subprocess -import sys -import concurrent.futures -import pendulum -from typing import List, Dict, Optional, Any, Tuple -from pathlib import Path -from collections import deque -import threading - -from rich.console import Console -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID -from loguru import logger -from openai import OpenAI - -from .utils import is_dangerous_command -from .models import ( - DesignModel, - StateModel, - FileModel, - FileStatus, -) # 添加 FileStatus 导入 - - -class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 - """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" - - def __init__( - self, - api_key: Optional[str] = None, - base_url: str = "https://api.deepseek.com", - model: str = "deepseek-reasoner", - output_dir: str = "./generated", - log_file: Optional[str] = None, - max_concurrency: int = 4, - ): - """ - 初始化生成器 - - Args: - api_key: OpenAI API密钥,默认从环境变量DEEPSEEK_APIKEY读取 - base_url: API基础URL - model: 使用的模型 - output_dir: 输出根目录 - log_file: 日志文件路径,默认自动生成 - """ - self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY") - if not self.api_key: - raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") - - self.client = OpenAI(api_key=self.api_key, base_url=base_url) - self.model = model - self.output_dir = Path(output_dir) - self.output_dir.mkdir(parents=True, exist_ok=True) - self.state_file = self.output_dir / ".llm_generator_state.json" - self.console = Console() # 添加console实例用于rich打印 - self._state_lock = threading.Lock() - - self.max_concurrency = max_concurrency - - # 配置日志 - if log_file is None: - log_file = self.output_dir / "generator.log" - logger.remove() # 移除默认handler - logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上 - logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG - logger.info(f"日志已初始化,保存至: {log_file}") - - self.readme_content = None - self.design: Optional[DesignModel] = None - self.state: Optional[StateModel] = None - self.progress: Optional[Progress] = None - self.tasks: Dict[str, TaskID] = {} # 任务ID映射 - - def _call_llm( - self, - system_prompt: str, - user_prompt: str, - temperature: float = 0.2, - expect_json: bool = True, - ) -> Dict[str, Any]: - """ - 调用LLM并返回解析后的JSON - """ - logger.debug(f"调用LLM,模型: {self.model}") - - try: - response = self.client.chat.completions.create( - model=self.model, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ], - temperature=temperature, - response_format={"type": "json_object"} if expect_json else None, - ) - - message = response.choices[0].message - content = message.content - - # 记录思考过程(如果存在) - reasoning_content = None - if hasattr(message, "reasoning_content") and message.reasoning_content: - reasoning_content = message.reasoning_content - logger.info("模型思考过程已记录") - - # 创建响应目录 - responses_dir = self.output_dir / "llm_responses" - responses_dir.mkdir(parents=True, exist_ok=True) - - # 生成文件名(使用当前时间) - timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") - response_file = responses_dir / f"response_{timestamp}.json" - - # 保存响应到JSON文件 - response_data = { - "timestamp": timestamp, - "model": self.model, - "content": content, - "reasoning_content": reasoning_content, - "system_prompt": system_prompt, - "user_prompt": user_prompt, - "temperature": temperature, - "expect_json": expect_json, - } - - with open(response_file, "w", encoding="utf-8") as f: - json.dump(response_data, f, indent=2, ensure_ascii=False) - - logger.debug(f"LLM原始响应: {response_file.name}") - - if expect_json: - result = json.loads(content) - else: - result = {"content": content} - - return result - - except json.JSONDecodeError as e: - logger.error(f"JSON解析失败: {e}") - self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") - raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") - except Exception as e: - logger.error(f"LLM调用失败: {e}") - self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") - raise - - def parse_readme(self, readme_path: Path) -> str: - """ - 读取README文件内容 - """ - logger.info(f"读取README文件: {readme_path}") - try: - with open(readme_path, "r", encoding="utf-8") as f: - content = f.read() - logger.debug(f"README内容长度: {len(content)} 字符") - return content - except Exception as e: - logger.error(f"读取README失败: {e}") - self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]") - raise - - def generate_design_json(self) -> DesignModel: - """ - 调用LLM生成design.json内容,并解析为DesignModel - """ - system_prompt = ( - "你是一个软件架构师。请根据README描述,生成项目的中间设计文件design.json。" - "design.json应包含项目名称、版本、描述、文件列表(含路径、摘要、依赖、函数和类)、建议命令和检查工具。" - "返回严格的 JSON 对象,符合DesignModel结构。" - ) - user_prompt = f"README内容如下:\n\n{self.readme_content}" - - result = self._call_llm(system_prompt, user_prompt) - design_data = result - design = DesignModel(**design_data) - - # 写入design.json文件 - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) - logger.info(f"已生成design.json: {design_path}") - - return design - - def load_state(self) -> Optional[StateModel]: - """加载断点续写状态""" - if self.state_file.exists(): - try: - with open(self.state_file, "r", encoding="utf-8") as f: - state_data = json.load(f) - self.state = StateModel(**state_data) - logger.info( - f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" - ) - return self.state - except Exception as e: - logger.error(f"加载状态失败: {e}") - self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") - return None - return None - - def save_state( - self, generated_files: List[str], dependencies_map: Dict[str, List[str]] - ) -> None: - """保存断点续写状态,适应并发生成(线程安全)""" - with self._state_lock: # 串行化写入 - state = StateModel( - current_file_index=0, - generated_files=generated_files, - dependencies_map=dependencies_map, - total_files=len(self.design.files) if self.design else 0, - output_dir=str(self.output_dir), - readme_path=self.readme_content[:100] if self.readme_content else "", - ) - with open(self.state_file, "w", encoding="utf-8") as f: - json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) - logger.debug(f"状态已保存: {self.state_file}") - - def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: - """ - 从design.json获取文件列表和依赖关系 - - Returns: - (files, dependencies) - files: 按顺序需要生成的文件路径列表 - dependencies: 字典 {file: [依赖文件路径]} - """ - if not self.design: - raise ValueError("design.json未加载,请先调用generate_design_json") - - files = [file.path for file in self.design.files] - dependencies = {file.path: file.dependencies for file in self.design.files} - - logger.info(f"从design.json解析到 {len(files)} 个待生成文件") - logger.debug(f"文件列表: {files}") - logger.debug(f"依赖关系: {dependencies}") - - return files, dependencies - - def _add_implicit_dependencies( - self, files: List[str], dependencies: Dict[str, List[str]] - ) -> Dict[str, List[str]]: - """ - 添加隐式依赖关系,基于文件路径和常见模式 - - Args: - files: 文件路径列表 - dependencies: 原始依赖字典 - - Returns: - Dict[str, List[str]]: 增强后的依赖字典 - """ - enhanced = dependencies.copy() - for file in files: - if file not in enhanced: - enhanced[file] = [] - # 添加同一目录下的其他文件作为隐式依赖(简单示例) - path = Path(file) - implicit_deps = [ - f - for f in files - if f != file - and Path(f).parent == path.parent - and f not in enhanced[file] - ] - if implicit_deps: - enhanced[file].extend(implicit_deps) - logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") - return enhanced - - def generate_file( - self, - file_path: str, - prompt_instruction: str, - dependency_files: List[str], - existing_content: Optional[str] = None, - output_format: str = "full", # 新增参数,默认 'full' - ) -> Tuple[str, str, List[str]]: - """ - 生成单个文件,返回 (代码, 描述, 命令列表) - - Args: - file_path: 目标文件路径 - prompt_instruction: 生成指令 - dependency_files: 依赖文件列表(用于上下文) - existing_content: 文件现有内容(若为修改模式) - output_format: 输出格式,'full',来自 models.py - """ - # 收集上下文内容 - context_content = [] - - if self.readme_content: - context_content.append(f"### 项目 README ###\n{self.readme_content}\n") - - # 添加 design.json 上下文 - design_path = self.output_dir / "design.json" - if design_path.exists(): - try: - with open(design_path, "r", encoding="utf-8") as f: - design_content = f.read() - context_content.append( - f"### 设计文件: design.json ###\n{design_content}\n" - ) - except Exception as e: - logger.error(f"读取design.json失败: {e}") - self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") - # 如果design.json读取失败,可能无法继续,但保持上下文为空或部分 - - # 添加依赖文件内容(仅读取存在的文件) - for dep in dependency_files: - dep_path = Path(dep) - if not dep_path.exists(): - alt_path = self.output_dir / dep - if alt_path.exists(): - dep_path = alt_path - else: - logger.warning(f"依赖文件不存在,已跳过: {dep}") - self.console.print( - f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]" - ) - continue - - try: - with open(dep_path, "r", encoding="utf-8") as f: - content = f.read() - context_content.append( - f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" - ) - except Exception as e: - logger.error(f"读取依赖文件 {dep} 失败: {e}") - self.console.print( - f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]" - ) - # 跳过此依赖文件 - - # 如果有现有内容,也加入上下文 - if existing_content is not None: - context_content.append( - f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n" - ) - - full_context = "\n".join(context_content) - - # output_format 为 'full' 或其他,保持现有逻辑 - if existing_content is not None: - system_prompt = ( - "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" - "返回严格的 JSON 对象,包含四个字段:\n" - "- code: (string) 修改后的完整代码\n" - "- description: (string) 简短的中文修改描述\n" - "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" - "- output_format: (string) 应为 'full'" - ) - else: - system_prompt = ( - "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" - "返回严格的 JSON 对象,包含四个字段:\n" - "- code: (string) 生成的完整代码\n" - "- description: (string) 简短的中文功能描述\n" - "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" - "- output_format: (string) 应为 'full'" - ) - - user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" - - try: - result = self._call_llm(system_prompt, user_prompt) - code = result.get("code") - description = result.get("description", "") - commands = result.get("commands", []) - result.get("output_format", "full") - if code is None: - raise ValueError("LLM 响应中没有 code 字段") - return code, description, commands - except Exception as e: - logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") - self.console.print( - f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]" - ) - # 返回默认值以便继续 - return "# 生成失败,请检查日志", "生成失败,发生错误", [] - - def _generate_file_task( - self, file_path: str, dependencies: List[str], generated_files: set - ) -> Tuple[bool, str]: - """ - 并发任务函数,用于生成单个文件 - - Args: - file_path: 文件路径 - dependencies: 依赖文件列表 - generated_files: 已生成文件的集合(用于上下文) - - Returns: - Tuple[bool, str]: (是否成功, 错误信息或空字符串) - """ - try: - instruction = ( - f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" - ) - # 过滤依赖文件,只使用已生成的 - available_deps = [dep for dep in dependencies if dep in generated_files] - code, desc, commands = self.generate_file( - file_path, instruction, available_deps - ) - logger.info(f"生成完成: {file_path} - {desc}") - - # 写入文件 - output_path = self.output_dir / file_path - output_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_path, "w", encoding="utf-8") as f: - f.write(code) - logger.info(f"已写入: {output_path}") - - # 执行命令 - for cmd in commands: - logger.info(f"准备执行命令: {cmd}") - success = self.execute_command(cmd, cwd=self.output_dir) - if not success: - logger.warning(f"命令执行失败,但继续处理: {cmd}") - return True, "" - except Exception as e: - logger.error(f"生成文件 {file_path} 失败: {e}") - return False, str(e) - - def _topological_sort( - self, files: List[str], dependencies: Dict[str, List[str]] - ) -> List[str]: - """ - 对文件列表进行拓扑排序,基于依赖关系。 - 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 - 如果检测到循环依赖,抛出ValueError。 - """ - from collections import deque - - # 初始化入度和反向邻接表 - in_degree = {f: 0 for f in files} - rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f - - # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] - for f in files: - for dep in dependencies.get(f, []): - if dep in files: # 只考虑在files中的依赖 - in_degree[f] += 1 # f依赖于dep,所以f的入度增加 - rev_graph[dep].append(f) # dep被f依赖 - - # 队列初始化为入度为0的文件(无依赖的文件) - queue = deque([f for f in files if in_degree[f] == 0]) - sorted_files = [] - - while queue: - node = queue.popleft() - sorted_files.append(node) - # 所有依赖于node的文件入度减1 - for dependent in rev_graph[node]: - in_degree[dependent] -= 1 - if in_degree[dependent] == 0: - queue.append(dependent) - - # 检查是否所有文件都已排序(无循环依赖) - if len(sorted_files) != len(files): - raise ValueError( - f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" - ) - - return sorted_files - - def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: - """ - 执行单个命令,检查风险,失败仅记录错误不抛出异常 - - Returns: - bool: 命令是否成功执行 - """ - dangerous, reason = is_dangerous_command(cmd) - if dangerous: - logger.error(f"危险命令被阻止: {cmd},原因: {reason}") - self.console.print( - f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" - ) - return False - - logger.info(f"执行命令: {cmd}") - try: - result = subprocess.run( - cmd, - shell=True, - cwd=cwd or self.output_dir, - capture_output=True, - text=True, - timeout=300, # 5分钟超时 - ) - logger.debug(f"命令返回码: {result.returncode}") - if result.stdout: - logger.debug(f"stdout: {result.stdout[:500]}") - if result.stderr: - logger.warning(f"stderr: {result.stderr[:500]}") - if result.returncode != 0: - logger.error(f"命令执行失败,返回码: {result.returncode}") - self.console.print( - f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" - ) - return False - return True - except subprocess.TimeoutExpired: - logger.error(f"命令执行超时: {cmd}") - self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") - return False - except Exception as e: - logger.error(f"命令执行失败: {e}") - self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") - return False - - def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: - """ - 调用 LLM 分析工单,返回结构化变更计划 - """ - system_prompt = ( - "你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件(design.json)," - "分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n" - "- affected_files: 数组,每个元素为一个对象,包含:\n" - " - path: 文件路径(相对于项目根目录)\n" - " - action: 'create' 或 'modify'\n" - " - description: 对此文件变更的简短描述\n" - " - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n" - "- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n" - "注意:仅返回 JSON,不要包含其他文本。" - ) - - # 将现有 design.json 内容作为上下文的一部分 - if not self.design: - design_path = self.output_dir / "design.json" - try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) - except Exception as e: - logger.error(f"加载design.json失败: {e}") - self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") - raise e - design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) - user_prompt = ( - f"工单类型: {issue_type}\n" - f"工单内容:\n{issue_content}\n\n" - f"现有设计文件 (design.json):\n{design_str}" - ) - - result = self._call_llm(system_prompt, user_prompt, temperature=0.2) - return result - - def _update_design( - self, generated_files: List[str], design_updates: Dict[str, Any] - ): - """ - 根据生成的变更更新 design.json - 使用 FileModel 来处理文件信息 - """ - updated = False - - # 处理新增文件 - for file_path in generated_files: - # 检查文件是否已在 design.files 中 - exists = any(f.path == file_path for f in self.design.files) - if not exists: - # 获取更新信息 - update_info = design_updates.get(file_path, {}) - - # 创建新文件条目(FileModel实例) - new_file = FileModel( - path=file_path, - summary=update_info.get("summary", "自动生成的新文件"), - dependencies=update_info.get("dependencies", []), - functions=update_info.get("functions", []), - classes=update_info.get("classes", []), - design_updates=update_info.get("design_updates", {}), - ) - self.design.files.append(new_file) - updated = True - logger.info(f"已将新文件 {file_path} 添加到 design.json") - - # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) - # 这里可根据实际需求扩展,当前仅处理新增文件 - - if updated: - # 保存更新后的 design.json - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) - logger.info("design.json 已更新") - - def refresh_design(self) -> bool: - """ - 重新生成design.json,基于当前README内容或加载的design.json - 返回bool表示是否成功 - """ - logger.info("开始刷新design.json") - if not self.readme_content: - # 尝试读取README.md文件 - readme_path = self.output_dir / "README.md" - if readme_path.exists(): - try: - self.readme_content = self.parse_readme(readme_path) - except Exception as e: - logger.error(f"读取README.md失败,无法刷新design: {e}") - self.console.print( - f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]" - ) - return False - else: - logger.error("没有README内容,且README.md文件不存在,无法刷新design") - self.console.print( - "[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]" - ) - return False - - try: - self.design = self.generate_design_json() - logger.info("design.json已成功重新生成") - self.console.print("[green]✅ design.json已重新生成[/green]") - return True - except Exception as e: - logger.error(f"重新生成design.json失败: {e}") - self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]") - return False - - def update_file_entry(self, file_path: str, file_content: str) -> bool: - """ - 更新design.json中单个文件的条目,基于提供的文件内容 - 返回bool表示是否成功 - """ - logger.info(f"开始更新design.json中文件条目: {file_path}") - if not self.design: - # 加载现有design.json - design_path = self.output_dir / "design.json" - if not design_path.exists(): - logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print( - f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" - ) - return False - try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - self.design = DesignModel(**design_data) - except Exception as e: - logger.error(f"加载design.json失败: {e}") - self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") - return False - - # 调用LLM分析文件内容,返回更新信息,增强以支持design_updates字段 - system_prompt = ( - "你是一个软件架构师。分析给定的文件内容,并返回对design.json中该文件条目的更新。" - "返回严格的JSON对象,包含以下字段:\n" - "- summary: 文件的新摘要\n" - "- dependencies: 依赖文件列表\n" - "- functions: 函数列表,每个对象有name, summary, inputs, outputs\n" - "- classes: 类列表,每个对象有name, summary, methods\n" - "- design_updates: 可选,设计更新字典\n" - "注意:仅返回JSON,不要其他文本。" - ) - # 准备当前design.json中该文件的条目信息 - current_entry = None - for f in self.design.files: - if f.path == file_path: - current_entry = f.model_dump() - break - user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目(如果存在):\n{json.dumps(current_entry, indent=2) if current_entry else '无'}" - - try: - result = self._call_llm(system_prompt, user_prompt, temperature=0.2) - update_info = result - - # 查找或创建文件条目 - file_model = None - for f in self.design.files: - if f.path == file_path: - file_model = f - break - if file_model is None: - # 创建新条目,包括design_updates - new_file = FileModel( - path=file_path, - summary=update_info.get("summary", ""), - dependencies=update_info.get("dependencies", []), - functions=update_info.get("functions", []), - classes=update_info.get("classes", []), - design_updates=update_info.get("design_updates", {}), # 新增design_updates处理 - ) - self.design.files.append(new_file) - logger.info(f"在design.json中创建了新文件条目: {file_path}") - else: - # 更新现有条目,使用merge_design_updates处理design_updates - if 'design_updates' in update_info: - file_model.merge_design_updates(update_info['design_updates']) - # 更新其他字段 - file_model.summary = update_info.get("summary", file_model.summary) - file_model.dependencies = update_info.get( - "dependencies", file_model.dependencies - ) - file_model.functions = update_info.get( - "functions", file_model.functions - ) - file_model.classes = update_info.get("classes", file_model.classes) - logger.info(f"更新了design.json中的文件条目: {file_path}") - - # 保存更新后的design.json - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) - logger.info(f"design.json已更新,文件条目: {file_path}") - self.console.print( - f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" - ) - return True - except Exception as e: - logger.error(f"更新文件条目失败: {e}") - self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]") - return False - - def sync_readme(self) -> bool: - """ - 同步README.md和design.json,确保内容一致性 - 返回bool表示是否成功 - """ - logger.info("开始同步README.md和design.json") - # 读取README.md - readme_path = self.output_dir / "README.md" - if not readme_path.exists(): - logger.error(f"README.md不存在于 {self.output_dir}") - self.console.print( - f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]" - ) - return False - try: - with open(readme_path, "r", encoding="utf-8") as f: - readme_content = f.read() - except Exception as e: - logger.error(f"读取README.md失败: {e}") - self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") - return False - - # 加载design.json - design_path = self.output_dir / "design.json" - if not design_path.exists(): - logger.error(f"design.json不存在于 {self.output_dir}") - self.console.print( - f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" - ) - return False - try: - with open(design_path, "r", encoding="utf-8") as f: - design_data = json.load(f) - design = DesignModel(**design_data) - except Exception as e: - logger.error(f"加载design.json失败: {e}") - self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") - return False - - # 调用LLM比较和同步 - system_prompt = ( - "你是一个软件架构师。比较README.md内容和design.json,识别不一致之处,并建议更新。" - "返回严格的JSON对象,包含以下字段:\n" - "- needs_update: bool, 是否需要更新\n" - "- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n" - "- updates: 对象,描述具体的更新内容\n" - "注意:仅返回JSON,不要其他文本。" - ) - user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" - - try: - result = self._call_llm(system_prompt, user_prompt, temperature=0.2) - needs_update = result.get("needs_update", False) - if not needs_update: - logger.info("README.md和design.json已同步,无需更新") - self.console.print( - "[green]✅ README.md和design.json已同步,无需更新[/green]" - ) - return True - - update_type = result.get("update_type", "") - updates = result.get("updates", {}) - if update_type == "readme": - # 更新README.md - new_readme = updates.get("new_readme", readme_content) - with open(readme_path, "w", encoding="utf-8") as f: - f.write(new_readme) - logger.info("已更新README.md") - self.console.print("[green]✅ README.md已更新[/green]") - elif update_type == "design": - # 更新design.json - new_design_data = updates.get("new_design", design.model_dump()) - design = DesignModel(**new_design_data) - with open(design_path, "w", encoding="utf-8") as f: - json.dump(new_design_data, f, indent=2, ensure_ascii=False) - logger.info("已更新design.json") - self.console.print("[green]✅ design.json已更新[/green]") - elif update_type == "both": - # 更新两者 - new_readme = updates.get("new_readme", readme_content) - new_design_data = updates.get("new_design", design.model_dump()) - with open(readme_path, "w", encoding="utf-8") as f: - f.write(new_readme) - design = DesignModel(**new_design_data) - with open(design_path, "w", encoding="utf-8") as f: - json.dump(new_design_data, f, indent=2, ensure_ascii=False) - logger.info("已同步更新README.md和design.json") - self.console.print("[green]✅ README.md和design.json已同步更新[/green]") - else: - logger.warning(f"未知的update_type: {update_type}") - self.console.print( - f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]" - ) - return False - return True - except Exception as e: - logger.error(f"同步README.md失败: {e}") - self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") - return False -``` \ No newline at end of file + - 生成对应的单元测试。 \ No newline at end of file diff --git a/src/llm_codegen/cli.py b/src/llm_codegen/cli.py index e88afa9..9172bde 100644 --- a/src/llm_codegen/cli.py +++ b/src/llm_codegen/cli.py @@ -13,6 +13,7 @@ from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from loguru import logger +from .core import BaseGenerator from .init_generator import InitGenerator from .enhance_generator import EnhanceGenerator from .fix_generator import FixGenerator diff --git a/src/llm_codegen/command_executor.py b/src/llm_codegen/command_executor.py deleted file mode 100644 index ada8c5d..0000000 --- a/src/llm_codegen/command_executor.py +++ /dev/null @@ -1,51 +0,0 @@ -import subprocess -import os -from typing import Optional, Tuple -from loguru import logger -from .utils import is_dangerous_command - - -class CommandExecutor: - """ - 命令执行器,负责执行系统命令并集成危险命令拦截。 - """ - - def __init__(self): - pass - - def execute(self, cmd: str, cwd: Optional[str] = None) -> Tuple[bool, str]: - """ - 执行系统命令,在执行前检查是否危险。 - - Args: - cmd: 要执行的命令字符串。 - cwd: 工作目录路径,如果为 None 则使用当前目录。 - - Returns: - Tuple[bool, str]: (执行是否成功, 输出或错误消息)。 - """ - # 检查命令是否危险 - is_dangerous, reason = is_dangerous_command(cmd) - if is_dangerous: - logger.warning(f"危险命令被拦截: {cmd}, 原因: {reason}") - return False, f"命令危险被拦截: {reason}" - - try: - # 执行命令 - result = subprocess.run( - cmd, - shell=True, # 使用 shell 执行命令字符串 - cwd=cwd, - capture_output=True, - text=True, - encoding='utf-8' - ) - if result.returncode == 0: - logger.info(f"命令执行成功: {cmd}") - return True, result.stdout - else: - logger.error(f"命令执行失败: {cmd}, 错误: {result.stderr}") - return False, result.stderr - except Exception as e: - logger.error(f"执行命令时发生异常: {cmd}, 异常: {e}") - return False, str(e) diff --git a/src/llm_codegen/core.py b/src/llm_codegen/core.py index be7daa4..95d569d 100644 --- a/src/llm_codegen/core.py +++ b/src/llm_codegen/core.py @@ -14,25 +14,17 @@ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID from loguru import logger from openai import OpenAI -from .utils import is_dangerous_command, read_file as utils_read_file, write_file as utils_write_file, ensure_dir, safe_join, log_error, is_fatal_error, build_dependency_graph, compute_in_degrees, topological_sort as utils_topological_sort, create_progress_bar +from .utils import is_dangerous_command from .models import ( DesignModel, StateModel, FileModel, FileStatus, - LLMResponse, - OutputFormat -) -from .llm_client import LLMClient -from .file_operations import handle_llm_response, generate_diff, apply_diff -from .command_executor import CommandExecutor -from .dependency_sorter import topological_sort as dependency_topological_sort, detect_cycles -from .design_manager import DesignManager -from .state_manager import StateManager +) # 添加 FileStatus 导入 class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 - """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行,使用组合模式集成模块""" + """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" def __init__( self, @@ -57,11 +49,14 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if not self.api_key: raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") + self.client = OpenAI(api_key=self.api_key, base_url=base_url) + self.model = model self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.state_file = self.output_dir / ".llm_generator_state.json" self.console = Console() # 添加console实例用于rich打印 self._state_lock = threading.Lock() + self.max_concurrency = max_concurrency # 配置日志 @@ -72,18 +67,6 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG logger.info(f"日志已初始化,保存至: {log_file}") - # 初始化模块实例(组合模式) - self.llm_client = LLMClient( - api_key=self.api_key, - model=model, - base_url=base_url if base_url != "https://api.deepseek.com" else None, # LLMClient 处理默认URL - log_level="INFO" - ) - self.command_executor = CommandExecutor() - self.design_manager = DesignManager(design_file_path=str(self.output_dir / "design.json")) - self.state_manager = StateManager(state_file_path=str(self.state_file)) - # file_operations 和 dependency_sorter 作为函数模块直接使用 - self.readme_content = None self.design: Optional[DesignModel] = None self.state: Optional[StateModel] = None @@ -98,24 +81,66 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 expect_json: bool = True, ) -> Dict[str, Any]: """ - 调用LLM并返回解析后的JSON,使用 LLMClient 模块 + 调用LLM并返回解析后的JSON """ - logger.debug(f"调用LLM,模型: {self.llm_client.model}") + logger.debug(f"调用LLM,模型: {self.model}") + try: - llm_response = self.llm_client.call( - system_prompt=system_prompt, - user_prompt=user_prompt, + response = self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], temperature=temperature, - expect_json=expect_json + response_format={"type": "json_object"} if expect_json else None, ) - # 转换为字典以保持接口兼容 - result = { - "code": llm_response.code, - "description": llm_response.description, - "commands": llm_response.commands, - "output_format": llm_response.output_format.value + + message = response.choices[0].message + content = message.content + + # 记录思考过程(如果存在) + reasoning_content = None + if hasattr(message, "reasoning_content") and message.reasoning_content: + reasoning_content = message.reasoning_content + logger.info("模型思考过程已记录") + + # 创建响应目录 + responses_dir = self.output_dir / "llm_responses" + responses_dir.mkdir(parents=True, exist_ok=True) + + # 生成文件名(使用当前时间) + timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") + response_file = responses_dir / f"response_{timestamp}.json" + + # 保存响应到JSON文件 + response_data = { + "timestamp": timestamp, + "model": self.model, + "content": content, + "reasoning_content": reasoning_content, + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "temperature": temperature, + "expect_json": expect_json, } + + with open(response_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, indent=2, ensure_ascii=False) + + logger.debug(f"LLM原始响应: {response_file.name}") + + if expect_json: + result = json.loads(content) + else: + result = {"content": content} + return result + + except json.JSONDecodeError as e: + logger.error(f"JSON解析失败: {e}") + self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") + raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") except Exception as e: logger.error(f"LLM调用失败: {e}") self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") @@ -123,11 +148,12 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 def parse_readme(self, readme_path: Path) -> str: """ - 读取README文件内容,使用 file_operations 模块 + 读取README文件内容 """ logger.info(f"读取README文件: {readme_path}") try: - content = utils_read_file(str(readme_path)) + with open(readme_path, "r", encoding="utf-8") as f: + content = f.read() logger.debug(f"README内容长度: {len(content)} 字符") return content except Exception as e: @@ -150,30 +176,35 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 design_data = result design = DesignModel(**design_data) - # 保存design.json文件 - self.design_manager.save_design(design) - logger.info(f"已生成design.json: {self.design_manager.design_file_path}") + # 写入design.json文件 + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info(f"已生成design.json: {design_path}") return design def load_state(self) -> Optional[StateModel]: - """加载断点续写状态,使用 StateManager 模块""" - try: - self.state = self.state_manager.read_state() - if self.state: + """加载断点续写状态""" + if self.state_file.exists(): + try: + with open(self.state_file, "r", encoding="utf-8") as f: + state_data = json.load(f) + self.state = StateModel(**state_data) logger.info( f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" ) - return self.state - except Exception as e: - logger.error(f"加载状态失败: {e}") - self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") - return None + return self.state + except Exception as e: + logger.error(f"加载状态失败: {e}") + self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") + return None + return None def save_state( self, generated_files: List[str], dependencies_map: Dict[str, List[str]] ) -> None: - """保存断点续写状态,适应并发生成(线程安全),使用 StateManager 模块""" + """保存断点续写状态,适应并发生成(线程安全)""" with self._state_lock: # 串行化写入 state = StateModel( current_file_index=0, @@ -183,7 +214,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 output_dir=str(self.output_dir), readme_path=self.readme_content[:100] if self.readme_content else "", ) - self.state_manager.write_state(state) + with open(self.state_file, "w", encoding="utf-8") as f: + json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) logger.debug(f"状态已保存: {self.state_file}") def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: @@ -196,11 +228,7 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 dependencies: 字典 {file: [依赖文件路径]} """ if not self.design: - # 尝试加载设计 - try: - self.design = self.design_manager.load_design() - except Exception as e: - raise ValueError(f"design.json未加载或加载失败: {e}") + raise ValueError("design.json未加载,请先调用generate_design_json") files = [file.path for file in self.design.files] dependencies = {file.path: file.dependencies for file in self.design.files} @@ -270,7 +298,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 design_path = self.output_dir / "design.json" if design_path.exists(): try: - design_content = utils_read_file(str(design_path)) + with open(design_path, "r", encoding="utf-8") as f: + design_content = f.read() context_content.append( f"### 设计文件: design.json ###\n{design_content}\n" ) @@ -294,7 +323,8 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 continue try: - content = utils_read_file(str(dep_path)) + with open(dep_path, "r", encoding="utf-8") as f: + content = f.read() context_content.append( f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" ) @@ -336,19 +366,13 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" try: - llm_response = self.llm_client.call( - system_prompt=system_prompt, - user_prompt=user_prompt, - temperature=0.2, - expect_json=True - ) - code = llm_response.code - description = llm_response.description - commands = llm_response.commands - # 使用 handle_llm_response 处理文件写入,支持不同输出格式 - output_path = str(self.output_dir / file_path) - handle_llm_response(output_path, llm_response) - logger.info(f"文件已生成并写入: {output_path}") + result = self._call_llm(system_prompt, user_prompt) + code = result.get("code") + description = result.get("description", "") + commands = result.get("commands", []) + result.get("output_format", "full") + if code is None: + raise ValueError("LLM 响应中没有 code 字段") return code, description, commands except Exception as e: logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") @@ -383,6 +407,13 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 ) logger.info(f"生成完成: {file_path} - {desc}") + # 写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入: {output_path}") + # 执行命令 for cmd in commands: logger.info(f"准备执行命令: {cmd}") @@ -402,16 +433,39 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 如果检测到循环依赖,抛出ValueError。 """ - try: - sorted_files = dependency_topological_sort(dependencies) - # 确保所有文件都在排序列表中 - if set(sorted_files) != set(files): - logger.warning("依赖图可能不完整,调整排序列表") - sorted_files = [f for f in files if f in sorted_files] + [f for f in files if f not in sorted_files] - return sorted_files - except ValueError as e: - logger.error(f"拓扑排序失败: {e}") - raise + from collections import deque + + # 初始化入度和反向邻接表 + in_degree = {f: 0 for f in files} + rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f + + # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] + for f in files: + for dep in dependencies.get(f, []): + if dep in files: # 只考虑在files中的依赖 + in_degree[f] += 1 # f依赖于dep,所以f的入度增加 + rev_graph[dep].append(f) # dep被f依赖 + + # 队列初始化为入度为0的文件(无依赖的文件) + queue = deque([f for f in files if in_degree[f] == 0]) + sorted_files = [] + + while queue: + node = queue.popleft() + sorted_files.append(node) + # 所有依赖于node的文件入度减1 + for dependent in rev_graph[node]: + in_degree[dependent] -= 1 + if in_degree[dependent] == 0: + queue.append(dependent) + + # 检查是否所有文件都已排序(无循环依赖) + if len(sorted_files) != len(files): + raise ValueError( + f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" + ) + + return sorted_files def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: """ @@ -420,11 +474,44 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 Returns: bool: 命令是否成功执行 """ - success, output = self.command_executor.execute(cmd, cwd=str(cwd) if cwd else None) - if not success: - logger.error(f"命令执行失败: {cmd}, 输出: {output}") - self.console.print(f"[bold red]❌ 命令执行失败: {cmd}[/bold red]") - return success + dangerous, reason = is_dangerous_command(cmd) + if dangerous: + logger.error(f"危险命令被阻止: {cmd},原因: {reason}") + self.console.print( + f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" + ) + return False + + logger.info(f"执行命令: {cmd}") + try: + result = subprocess.run( + cmd, + shell=True, + cwd=cwd or self.output_dir, + capture_output=True, + text=True, + timeout=300, # 5分钟超时 + ) + logger.debug(f"命令返回码: {result.returncode}") + if result.stdout: + logger.debug(f"stdout: {result.stdout[:500]}") + if result.stderr: + logger.warning(f"stderr: {result.stderr[:500]}") + if result.returncode != 0: + logger.error(f"命令执行失败,返回码: {result.returncode}") + self.console.print( + f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" + ) + return False + return True + except subprocess.TimeoutExpired: + logger.error(f"命令执行超时: {cmd}") + self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") + return False + except Exception as e: + logger.error(f"命令执行失败: {e}") + self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") + return False def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: """ @@ -444,8 +531,11 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 # 将现有 design.json 内容作为上下文的一部分 if not self.design: + design_path = self.output_dir / "design.json" try: - self.design = self.design_manager.load_design() + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -495,7 +585,9 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if updated: # 保存更新后的 design.json - self.design_manager.save_design(self.design) + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) logger.info("design.json 已更新") def refresh_design(self) -> bool: @@ -541,8 +633,17 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.info(f"开始更新design.json中文件条目: {file_path}") if not self.design: # 加载现有design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) + return False try: - self.design = self.design_manager.load_design() + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + self.design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -605,7 +706,9 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.info(f"更新了design.json中的文件条目: {file_path}") # 保存更新后的design.json - self.design_manager.save_design(self.design) + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) logger.info(f"design.json已更新,文件条目: {file_path}") self.console.print( f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" @@ -631,15 +734,25 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 ) return False try: - readme_content = utils_read_file(str(readme_path)) + with open(readme_path, "r", encoding="utf-8") as f: + readme_content = f.read() except Exception as e: logger.error(f"读取README.md失败: {e}") self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") return False # 加载design.json + design_path = self.output_dir / "design.json" + if not design_path.exists(): + logger.error(f"design.json不存在于 {self.output_dir}") + self.console.print( + f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" + ) + return False try: - design = self.design_manager.load_design() + with open(design_path, "r", encoding="utf-8") as f: + design_data = json.load(f) + design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") @@ -671,23 +784,27 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 if update_type == "readme": # 更新README.md new_readme = updates.get("new_readme", readme_content) - utils_write_file(str(readme_path), new_readme) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) logger.info("已更新README.md") self.console.print("[green]✅ README.md已更新[/green]") elif update_type == "design": # 更新design.json new_design_data = updates.get("new_design", design.model_dump()) design = DesignModel(**new_design_data) - self.design_manager.save_design(design) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) logger.info("已更新design.json") self.console.print("[green]✅ design.json已更新[/green]") elif update_type == "both": # 更新两者 new_readme = updates.get("new_readme", readme_content) new_design_data = updates.get("new_design", design.model_dump()) - utils_write_file(str(readme_path), new_readme) + with open(readme_path, "w", encoding="utf-8") as f: + f.write(new_readme) design = DesignModel(**new_design_data) - self.design_manager.save_design(design) + with open(design_path, "w", encoding="utf-8") as f: + json.dump(new_design_data, f, indent=2, ensure_ascii=False) logger.info("已同步更新README.md和design.json") self.console.print("[green]✅ README.md和design.json已同步更新[/green]") else: @@ -701,18 +818,3 @@ class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 logger.error(f"同步README.md失败: {e}") self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") return False - - def run(self, readme_path: Path) -> None: - """ - 主执行流程,控制整个生成过程(占位符,具体实现取决于子类或调用) - 保持接口不变,内部逻辑可能调整 - """ - # 示例:读取README,生成设计,生成文件 - self.readme_content = self.parse_readme(readme_path) - self.design = self.generate_design_json() - files, dependencies = self.get_project_structure() - # 简化:直接生成所有文件 - for file_path in files: - instruction = f"生成文件 {file_path}" - self.generate_file(file_path, instruction, dependencies.get(file_path, [])) - logger.info("生成完成") diff --git a/src/llm_codegen/dependency_sorter.py b/src/llm_codegen/dependency_sorter.py deleted file mode 100644 index 20f8958..0000000 --- a/src/llm_codegen/dependency_sorter.py +++ /dev/null @@ -1,114 +0,0 @@ -""" -Module for dependency sorting and cycle detection. -Provides functions to perform topological sort and detect cycles in a dependency graph. -""" - -from collections import defaultdict, deque - - -def topological_sort(dependencies): - """ - Perform topological sort on a dependency graph. - - Args: - dependencies (dict): A dictionary where keys are nodes (e.g., file paths) and values are lists of dependencies. - Example: {"src/llm_codegen/core.py": ["src/llm_codegen/utils.py", "src/llm_codegen/models.py"], ...} - - Returns: - list: A list of nodes in topological order. - Raises: - ValueError: If a cycle is detected in the graph, with details of the cycle. - """ - # Kahn's algorithm for topological sort - graph = defaultdict(list) - in_degree = defaultdict(int) - - # Build graph and compute in-degree - for node, deps in dependencies.items(): - graph[node] = deps[:] # Copy to avoid modification - for dep in deps: - in_degree[dep] += 1 - # Ensure all nodes are included in in_degree - if node not in in_degree: - in_degree[node] = 0 - - # Initialize queue with nodes having zero in-degree - queue = deque([node for node in in_degree if in_degree[node] == 0]) - sorted_nodes = [] - - while queue: - node = queue.popleft() - sorted_nodes.append(node) - for neighbor in graph.get(node, []): - in_degree[neighbor] -= 1 - if in_degree[neighbor] == 0: - queue.append(neighbor) - - # Check for cycles - if len(sorted_nodes) != len(in_degree): - # Cycle detected, find and report it - cycle = _detect_cycle_dfs(dependencies) - raise ValueError(f"Cycle detected in dependency graph: {cycle}") - - return sorted_nodes - - -def _detect_cycle_dfs(dependencies): - """ - Internal helper function to detect a cycle in a dependency graph using DFS. - - Args: - dependencies (dict): Same as topological_sort. - - Returns: - list: A list of nodes forming a cycle if found, else an empty list. - """ - graph = defaultdict(list) - for node, deps in dependencies.items(): - graph[node] = deps[:] - - visited = set() - rec_stack = set() - cycle = [] - - def dfs(node, path): - nonlocal cycle - visited.add(node) - rec_stack.add(node) - path.append(node) - - for neighbor in graph.get(node, []): - if neighbor not in visited: - if dfs(neighbor, path): - return True - elif neighbor in rec_stack: - # Cycle detected, extract from path - start_index = path.index(neighbor) - cycle = path[start_index:] + [neighbor] - return True - - rec_stack.remove(node) - path.pop() - return False - - for node in graph: - if node not in visited: - if dfs(node, []): - return cycle - return [] - - -def detect_cycles(dependencies): - """ - Detect cycles in a dependency graph. - - Args: - dependencies (dict): Same as topological_sort. - - Returns: - tuple: (has_cycle, cycle_nodes), where has_cycle is a boolean, and cycle_nodes is a list if cycle found. - """ - cycle = _detect_cycle_dfs(dependencies) - if cycle: - return True, cycle - return False, [] diff --git a/src/llm_codegen/design_manager.py b/src/llm_codegen/design_manager.py deleted file mode 100644 index 14a2a5a..0000000 --- a/src/llm_codegen/design_manager.py +++ /dev/null @@ -1,106 +0,0 @@ -import json -from pathlib import Path -from typing import Optional, Dict, Any -from .models import DesignModel, FileModel - - -class DesignManager: - """管理 design.json 的加载、保存、更新和同步操作。""" - - def __init__(self, design_file_path: str = "design.json") -> None: - """ - 初始化 DesignManager。 - - 参数: - design_file_path: design.json 文件的路径,默认为当前目录下的 design.json。 - """ - self.design_file_path = Path(design_file_path) - self.design: Optional[DesignModel] = None - - def load_design(self) -> DesignModel: - """ - 从文件加载 design.json 并解析为 DesignModel。 - - 返回: - DesignModel 实例。 - - 异常: - FileNotFoundError: 如果文件不存在。 - ValueError: 如果 JSON 解析失败或模型验证失败。 - """ - if not self.design_file_path.exists(): - raise FileNotFoundError(f"Design file not found: {self.design_file_path}") - - with open(self.design_file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - - self.design = DesignModel(**data) - return self.design - - def save_design(self, design: Optional[DesignModel] = None) -> None: - """ - 将 DesignModel 保存到 design.json 文件。 - - 参数: - design: 要保存的 DesignModel 实例。如果为 None,则使用 self.design。 - - 异常: - ValueError: 如果 design 为 None 且 self.design 也为 None。 - """ - if design is None: - design = self.design - if design is None: - raise ValueError("No design data to save.") - - with open(self.design_file_path, 'w', encoding='utf-8') as f: - json.dump(design.dict(), f, ensure_ascii=False, indent=2) - - def update_design(self, updates: Dict[str, Any]) -> None: - """ - 更新当前设计数据。 - - 参数: - updates: 一个字典,包含要更新的字段和值,符合 DesignModel 结构。 - 例如,{"project_name": "new_name", "files": [...]}。 - - 注意: - 此方法直接修改 self.design,如果 self.design 为 None,则先加载。 - """ - if self.design is None: - self.load_design() - - # 合并更新到现有设计数据 - current_data = self.design.dict() - current_data.update(updates) - self.design = DesignModel(**current_data) - - def sync_design(self) -> None: - """ - 同步设计数据,合并所有文件的 design_updates 到主设计。 - """ - if self.design is None: - self.load_design() - - for file_model in self.design.files: - if file_model.design_updates: - # 使用文件模型的 merge_design_updates 方法合并更新 - file_model.merge_design_updates(file_model.design_updates) - # 清空 design_updates,表示已同步 - file_model.design_updates = {} - - # 保存同步后的设计 - self.save_design() - - -# 便捷函数 - -def load_design(file_path: str) -> DesignModel: - """便捷函数,加载设计文件。""" - manager = DesignManager(file_path) - return manager.load_design() - - -def save_design(design: DesignModel, file_path: str) -> None: - """便捷函数,保存设计文件。""" - manager = DesignManager(file_path) - manager.save_design(design) diff --git a/src/llm_codegen/file_operations.py b/src/llm_codegen/file_operations.py deleted file mode 100644 index a583c41..0000000 --- a/src/llm_codegen/file_operations.py +++ /dev/null @@ -1,148 +0,0 @@ -import os -import pathlib -import difflib -from typing import List, Optional - -from .models import LLMResponse, OutputFormat - - -def read_file(file_path: str) -> str: - """ - 读取文件内容并返回字符串。 - - 参数: - file_path: 文件路径字符串。 - - 返回: - 文件内容的字符串。 - - 异常: - FileNotFoundError: 如果文件不存在。 - UnicodeDecodeError: 如果编码问题。 - """ - path = pathlib.Path(file_path) - if not path.exists(): - raise FileNotFoundError(f"File not found: {file_path}") - return path.read_text(encoding='utf-8') - - -def write_file(file_path: str, content: str) -> None: - """ - 写入内容到文件,如果目录不存在则创建。 - - 参数: - file_path: 文件路径字符串。 - content: 要写入的内容字符串。 - """ - path = pathlib.Path(file_path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(content, encoding='utf-8') - - -def ensure_directory_exists(dir_path: str) -> None: - """ - 确保目录存在,如果不存在则创建。 - - 参数: - dir_path: 目录路径字符串。 - """ - path = pathlib.Path(dir_path) - path.mkdir(parents=True, exist_ok=True) - - -def generate_diff(old_content: str, new_content: str) -> str: - """ - 生成 unified diff 字符串,比较旧内容和新内容。 - - 参数: - old_content: 旧内容字符串。 - new_content: 新内容字符串。 - - 返回: - unified diff 格式的字符串。 - """ - old_lines = old_content.splitlines(keepends=True) - new_lines = new_content.splitlines(keepends=True) - diff = difflib.unified_diff(old_lines, new_lines, fromfile='old', tofile='new') - return ''.join(diff) - - -def apply_diff(file_path: str, diff_content: str) -> None: - """ - 应用 unified diff 到文件,假设 diff_content 是有效的 diff 字符串。 - 这是一个简化实现,可能不处理所有 diff 情况。 - - 参数: - file_path: 要应用差异的文件路径字符串。 - diff_content: unified diff 格式的字符串。 - - 异常: - FileNotFoundError: 如果文件不存在。 - ValueError: 如果 diff 无法应用。 - """ - # 读取当前文件内容 - current_content = read_file(file_path) - current_lines = current_content.splitlines(keepends=True) - diff_lines = diff_content.splitlines(keepends=True) - - # 解析 diff 并生成新内容 - new_lines = [] - i = 0 - j = 0 # 索引用于 current_lines - while i < len(diff_lines): - line = diff_lines[i] - if line.startswith('---') or line.startswith('+++'): - i += 1 - continue - elif line.startswith('@@'): - # 解析范围信息,简化跳过 - i += 1 - continue - elif line.startswith(' '): - # 上下文行,应该匹配当前内容 - if j < len(current_lines) and current_lines[j] == line[1:]: - new_lines.append(current_lines[j]) - j += 1 - i += 1 - else: - raise ValueError(f"Patch does not apply at line {i}: context mismatch") - elif line.startswith('-'): - # 删除行,跳过当前内容中的对应行 - if j < len(current_lines) and current_lines[j] == line[1:]: - j += 1 - else: - raise ValueError(f"Patch does not apply at line {i}: deletion mismatch") - i += 1 - elif line.startswith('+'): - # 添加行 - new_lines.append(line[1:]) - i += 1 - else: - i += 1 - - # 添加剩余当前内容(如果有) - while j < len(current_lines): - new_lines.append(current_lines[j]) - j += 1 - - new_content = ''.join(new_lines) - write_file(file_path, new_content) - - -def handle_llm_response(file_path: str, response: LLMResponse) -> None: - """ - 处理 LLM 响应,根据 output_format 写入完整代码或应用差异。 - - 参数: - file_path: 目标文件路径字符串。 - response: LLMResponse 实例,包含代码、描述、命令和输出格式。 - - 异常: - ValueError: 如果不支持的 output_format。 - """ - if response.output_format == OutputFormat.FULL: - write_file(file_path, response.code) - elif response.output_format == OutputFormat.DIFF: - apply_diff(file_path, response.code) - else: - raise ValueError(f"Unsupported output format: {response.output_format}") diff --git a/src/llm_codegen/init_generator.py b/src/llm_codegen/init_generator.py index 4baea62..e4d63c0 100644 --- a/src/llm_codegen/init_generator.py +++ b/src/llm_codegen/init_generator.py @@ -1,12 +1,12 @@ import json from pathlib import Path from typing import Optional -from .core import CodeGenerator +from .core import BaseGenerator from loguru import logger # 确保日志可用 -class InitGenerator(CodeGenerator): - """处理 init 命令的生成器类,继承自 CodeGenerator,用于从 README 初始化项目。""" +class InitGenerator(BaseGenerator): + """处理 init 命令的生成器类,继承自 BaseGenerator,用于从 README 初始化项目。""" def __init__( self, diff --git a/src/llm_codegen/llm_client.py b/src/llm_codegen/llm_client.py deleted file mode 100644 index 653a5d6..0000000 --- a/src/llm_codegen/llm_client.py +++ /dev/null @@ -1,147 +0,0 @@ -import json -import logging -from datetime import datetime -from typing import Optional - -import openai -from pydantic import BaseModel - -from .models import LLMResponse - - -class LLMClient: - """LLM客户端类,封装API调用、响应保存和思考过程记录。""" - - def __init__( - self, - api_key: str, - model: str = "gpt-3.5-turbo", - base_url: Optional[str] = None, - log_level: str = "INFO", - ): - """ - 初始化LLM客户端。 - - 参数: - api_key: LLM API密钥。 - model: 使用的模型,默认为"gpt-3.5-turbo"。 - base_url: API基础URL,可选,用于自定义端点。 - log_level: 日志级别,默认为"INFO"。 - """ - self.api_key = api_key - self.model = model - self.base_url = base_url - # 初始化OpenAI客户端 - if self.base_url: - self.client = openai.OpenAI(api_key=api_key, base_url=base_url) - else: - self.client = openai.OpenAI(api_key=api_key) - # 配置日志 - self.logger = logging.getLogger(__name__) - self.logger.setLevel(getattr(logging, log_level.upper())) - if not self.logger.handlers: - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - def call( - self, - system_prompt: str, - user_prompt: str, - temperature: float = 0.7, - expect_json: bool = True, - ) -> LLMResponse: - """ - 调用LLM并返回解析后的响应。 - - 参数: - system_prompt: 系统提示。 - user_prompt: 用户提示。 - temperature: 温度参数,控制随机性。 - expect_json: 是否期望JSON响应,默认为True。 - - 返回: - LLMResponse对象。 - - 异常: - 抛出任何调用或解析错误。 - """ - # 记录思考过程开始 - self.logger.info("Starting LLM call.") - self.logger.debug(f"System prompt (first 100 chars): {system_prompt[:100]}") - self.logger.debug(f"User prompt (first 100 chars): {user_prompt[:100]}") - self.logger.debug(f"Temperature: {temperature}, Expect JSON: {expect_json}") - - try: - # 构建请求消息 - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - # 调用LLM API - response = self.client.chat.completions.create( - model=self.model, - messages=messages, - temperature=temperature, - response_format={"type": "json_object"} if expect_json else None, - ) - content = response.choices[0].message.content - self.logger.info("LLM call successful.") - - # 解析响应 - if expect_json: - data = json.loads(content) - llm_response = LLMResponse(**data) - else: - # 如果不是JSON,创建默认LLMResponse - llm_response = LLMResponse( - code=content, - description="Generated from non-JSON response", - commands=[], - output_format="full", - ) - - # 记录思考过程结束 - self.logger.info(f"Response parsed: {llm_response}") - - # 自动保存响应 - self.save_response(llm_response, system_prompt, user_prompt) - - return llm_response - except Exception as e: - self.logger.error(f"LLM call failed: {e}") - raise - - def save_response( - self, - response: LLMResponse, - system_prompt: str, - user_prompt: str, - file_path: Optional[str] = None, - ): - """ - 保存LLM响应到文件。 - - 参数: - response: LLMResponse对象。 - system_prompt: 系统提示。 - user_prompt: 用户提示。 - file_path: 文件路径,可选。如果未提供,使用默认路径。 - """ - if file_path is None: - # 默认保存到logs目录 - import os - os.makedirs("logs", exist_ok=True) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - file_path = f"logs/llm_response_{timestamp}.json" - - data = { - "timestamp": datetime.now().isoformat(), - "system_prompt": system_prompt, - "user_prompt": user_prompt, - "response": response.dict(), - } - with open(file_path, 'w') as f: - json.dump(data, f, indent=2) - self.logger.info(f"Response saved to {file_path}") diff --git a/src/llm_codegen/state_manager.py b/src/llm_codegen/state_manager.py deleted file mode 100644 index 02afa95..0000000 --- a/src/llm_codegen/state_manager.py +++ /dev/null @@ -1,78 +0,0 @@ -''' -状态管理器,用于管理断点续写状态文件的读写(线程安全)。 -''' -import json -from pathlib import Path -from threading import Lock -from typing import Optional - -from .models import StateModel - - -class StateManager: - '''状态管理器类,提供线程安全的状态文件读写操作。''' - - def __init__(self, state_file_path: str): - ''' - 初始化状态管理器。 - - 参数: - state_file_path: 状态文件的路径。 - ''' - self.state_file_path = Path(state_file_path) - self.lock = Lock() - - def read_state(self) -> Optional[StateModel]: - ''' - 从文件读取状态。 - - 返回: - StateModel 实例,如果文件不存在则返回 None。 - ''' - with self.lock: - if not self.state_file_path.exists(): - return None - try: - with open(self.state_file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - return StateModel(**data) - except (json.JSONDecodeError, IOError) as e: - raise RuntimeError(f"Failed to read state file: {e}") from e - - def write_state(self, state: StateModel) -> None: - ''' - 将状态写入文件。 - - 参数: - state: StateModel 实例。 - ''' - with self.lock: - try: - with open(self.state_file_path, 'w', encoding='utf-8') as f: - json.dump(state.dict(), f, indent=2, ensure_ascii=False) - except IOError as e: - raise RuntimeError(f"Failed to write state file: {e}") from e - - def initialize_state(self, total_files: int, output_dir: str, readme_path: str) -> StateModel: - ''' - 初始化状态并保存到文件。 - - 参数: - total_files: 总文件数。 - output_dir: 输出目录。 - readme_path: README 文件路径。 - - 返回: - 初始化的 StateModel 实例。 - ''' - state = StateModel( - current_file_index=0, - generated_files=[], - dependencies_map={}, - file_statuses={}, - total_files=total_files, - output_dir=output_dir, - readme_path=readme_path - ) - self.write_state(state) - return state