# 需求工单:拆分 core.py 为多个专注模块 name: 重构 core.py,拆分为多个单一职责的模块 description: | 当前 core.py 文件过于庞大,包含 LLM 调用、文件操作、命令执行、依赖排序、状态管理、设计文件维护等多个职责,导致代码难以维护和测试。需要将其拆分为多个独立的模块,每个模块负责一个清晰的功能领域,并通过组合方式在 BaseGenerator 中集成。 主要拆分目标: - 创建 `llm_client.py`:封装 LLM API 调用、响应保存和思考过程记录。 - 创建 `file_operations.py`:处理文件读写、目录创建和 diff 应用。 - 创建 `command_executor.py`:执行系统命令,集成危险命令拦截。 - 创建 `dependency_sorter.py`:提供依赖关系的拓扑排序及循环检测。 - 创建 `design_manager.py`:管理 design.json 的加载、保存、更新及同步操作。 - 创建 `state_manager.py`:管理断点续写状态文件的读写(线程安全)。 - 精简 `core.py` 中的 BaseGenerator,使其组合以上组件,保留对外接口不变。 同时需要更新 README.md,反映新的模块结构和设计思想。 affected_files: - src/llm_codegen/core.py - src/llm_codegen/llm_client.py # 新增 - src/llm_codegen/file_operations.py # 新增 - src/llm_codegen/command_executor.py # 新增 - src/llm_codegen/dependency_sorter.py # 新增 - src/llm_codegen/design_manager.py # 新增 - src/llm_codegen/state_manager.py # 新增 - README.md acceptance_criteria: - 所有原有功能(init、enhance、fix、design 子命令)在重构后行为完全一致,不引入新 bug。 - core.py 中的 _call_llm、_topological_sort、文件读写、命令执行、状态保存、design 操作等逻辑均迁移至对应新模块,BaseGenerator 仅保留组合与高层流程。 - 新模块职责单一,相互之间通过明确的接口调用,无循环依赖。 - 单元测试覆盖核心功能,且原有测试用例全部通过。 - README.md 中的“项目结构”部分更新为新文件列表,并简要说明各模块职责。 - 日志记录、进度显示、错误提示等用户体验相关功能保持不变。 - 代码风格符合项目规范(通过 black、pylint 等检查)。 - 生成对应的单元测试。 > 原core.py内容为: ```python import json import os import subprocess import sys import concurrent.futures import pendulum from typing import List, Dict, Optional, Any, Tuple from pathlib import Path from collections import deque import threading from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskID from loguru import logger from openai import OpenAI from .utils import is_dangerous_command from .models import ( DesignModel, StateModel, FileModel, FileStatus, ) # 添加 FileStatus 导入 class CodeGenerator: # 修改为 CodeGenerator 以符合设计文件 """代码生成器基类,封装公共逻辑,支持设计层、断点续写和命令执行""" def __init__( self, api_key: Optional[str] = None, base_url: str = "https://api.deepseek.com", model: str = "deepseek-reasoner", output_dir: str = "./generated", log_file: Optional[str] = None, max_concurrency: int = 4, ): """ 初始化生成器 Args: api_key: OpenAI API密钥,默认从环境变量DEEPSEEK_APIKEY读取 base_url: API基础URL model: 使用的模型 output_dir: 输出根目录 log_file: 日志文件路径,默认自动生成 """ self.api_key = api_key or os.getenv("DEEPSEEK_APIKEY") if not self.api_key: raise ValueError("必须提供API密钥,或设置环境变量DEEPSEEK_APIKEY") self.client = OpenAI(api_key=self.api_key, base_url=base_url) self.model = model self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.state_file = self.output_dir / ".llm_generator_state.json" self.console = Console() # 添加console实例用于rich打印 self._state_lock = threading.Lock() self.max_concurrency = max_concurrency # 配置日志 if log_file is None: log_file = self.output_dir / "generator.log" logger.remove() # 移除默认handler logger.add(sys.stderr, level="WARNING") # 控制台输出WARNING及以上 logger.add(log_file, rotation="10 MB", level="DEBUG") # 文件记录DEBUG logger.info(f"日志已初始化,保存至: {log_file}") self.readme_content = None self.design: Optional[DesignModel] = None self.state: Optional[StateModel] = None self.progress: Optional[Progress] = None self.tasks: Dict[str, TaskID] = {} # 任务ID映射 def _call_llm( self, system_prompt: str, user_prompt: str, temperature: float = 0.2, expect_json: bool = True, ) -> Dict[str, Any]: """ 调用LLM并返回解析后的JSON """ logger.debug(f"调用LLM,模型: {self.model}") try: response = self.client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], temperature=temperature, response_format={"type": "json_object"} if expect_json else None, ) message = response.choices[0].message content = message.content # 记录思考过程(如果存在) reasoning_content = None if hasattr(message, "reasoning_content") and message.reasoning_content: reasoning_content = message.reasoning_content logger.info("模型思考过程已记录") # 创建响应目录 responses_dir = self.output_dir / "llm_responses" responses_dir.mkdir(parents=True, exist_ok=True) # 生成文件名(使用当前时间) timestamp = pendulum.now().format("YYYYMMDD_HHmmss_SSS") response_file = responses_dir / f"response_{timestamp}.json" # 保存响应到JSON文件 response_data = { "timestamp": timestamp, "model": self.model, "content": content, "reasoning_content": reasoning_content, "system_prompt": system_prompt, "user_prompt": user_prompt, "temperature": temperature, "expect_json": expect_json, } with open(response_file, "w", encoding="utf-8") as f: json.dump(response_data, f, indent=2, ensure_ascii=False) logger.debug(f"LLM原始响应: {response_file.name}") if expect_json: result = json.loads(content) else: result = {"content": content} return result except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") self.console.print(f"[bold red]❌ JSON解析失败: {e}[/bold red]") raise ValueError(f"LLM返回的不是有效JSON: {content[:200]}") except Exception as e: logger.error(f"LLM调用失败: {e}") self.console.print(f"[bold red]❌ LLM调用失败: {e}[/bold red]") raise def parse_readme(self, readme_path: Path) -> str: """ 读取README文件内容 """ logger.info(f"读取README文件: {readme_path}") try: with open(readme_path, "r", encoding="utf-8") as f: content = f.read() logger.debug(f"README内容长度: {len(content)} 字符") return content except Exception as e: logger.error(f"读取README失败: {e}") self.console.print(f"[bold red]❌ 读取README失败: {e}[/bold red]") raise def generate_design_json(self) -> DesignModel: """ 调用LLM生成design.json内容,并解析为DesignModel """ system_prompt = ( "你是一个软件架构师。请根据README描述,生成项目的中间设计文件design.json。" "design.json应包含项目名称、版本、描述、文件列表(含路径、摘要、依赖、函数和类)、建议命令和检查工具。" "返回严格的 JSON 对象,符合DesignModel结构。" ) user_prompt = f"README内容如下:\n\n{self.readme_content}" result = self._call_llm(system_prompt, user_prompt) design_data = result design = DesignModel(**design_data) # 写入design.json文件 design_path = self.output_dir / "design.json" with open(design_path, "w", encoding="utf-8") as f: json.dump(design.model_dump(), f, indent=2, ensure_ascii=False) logger.info(f"已生成design.json: {design_path}") return design def load_state(self) -> Optional[StateModel]: """加载断点续写状态""" if self.state_file.exists(): try: with open(self.state_file, "r", encoding="utf-8") as f: state_data = json.load(f) self.state = StateModel(**state_data) logger.info( f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个" ) return self.state except Exception as e: logger.error(f"加载状态失败: {e}") self.console.print(f"[bold red]❌ 加载状态失败: {e}[/bold red]") return None return None def save_state( self, generated_files: List[str], dependencies_map: Dict[str, List[str]] ) -> None: """保存断点续写状态,适应并发生成(线程安全)""" with self._state_lock: # 串行化写入 state = StateModel( current_file_index=0, generated_files=generated_files, dependencies_map=dependencies_map, total_files=len(self.design.files) if self.design else 0, output_dir=str(self.output_dir), readme_path=self.readme_content[:100] if self.readme_content else "", ) with open(self.state_file, "w", encoding="utf-8") as f: json.dump(state.model_dump(), f, indent=2, ensure_ascii=False) logger.debug(f"状态已保存: {self.state_file}") def get_project_structure(self) -> Tuple[List[str], Dict[str, List[str]]]: """ 从design.json获取文件列表和依赖关系 Returns: (files, dependencies) files: 按顺序需要生成的文件路径列表 dependencies: 字典 {file: [依赖文件路径]} """ if not self.design: raise ValueError("design.json未加载,请先调用generate_design_json") files = [file.path for file in self.design.files] dependencies = {file.path: file.dependencies for file in self.design.files} logger.info(f"从design.json解析到 {len(files)} 个待生成文件") logger.debug(f"文件列表: {files}") logger.debug(f"依赖关系: {dependencies}") return files, dependencies def _add_implicit_dependencies( self, files: List[str], dependencies: Dict[str, List[str]] ) -> Dict[str, List[str]]: """ 添加隐式依赖关系,基于文件路径和常见模式 Args: files: 文件路径列表 dependencies: 原始依赖字典 Returns: Dict[str, List[str]]: 增强后的依赖字典 """ enhanced = dependencies.copy() for file in files: if file not in enhanced: enhanced[file] = [] # 添加同一目录下的其他文件作为隐式依赖(简单示例) path = Path(file) implicit_deps = [ f for f in files if f != file and Path(f).parent == path.parent and f not in enhanced[file] ] if implicit_deps: enhanced[file].extend(implicit_deps) logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") return enhanced def generate_file( self, file_path: str, prompt_instruction: str, dependency_files: List[str], existing_content: Optional[str] = None, output_format: str = "full", # 新增参数,默认 'full' ) -> Tuple[str, str, List[str]]: """ 生成单个文件,返回 (代码, 描述, 命令列表) Args: file_path: 目标文件路径 prompt_instruction: 生成指令 dependency_files: 依赖文件列表(用于上下文) existing_content: 文件现有内容(若为修改模式) output_format: 输出格式,'full',来自 models.py """ # 收集上下文内容 context_content = [] if self.readme_content: context_content.append(f"### 项目 README ###\n{self.readme_content}\n") # 添加 design.json 上下文 design_path = self.output_dir / "design.json" if design_path.exists(): try: with open(design_path, "r", encoding="utf-8") as f: design_content = f.read() context_content.append( f"### 设计文件: design.json ###\n{design_content}\n" ) except Exception as e: logger.error(f"读取design.json失败: {e}") self.console.print(f"[bold red]❌ 读取design.json失败: {e}[/bold red]") # 如果design.json读取失败,可能无法继续,但保持上下文为空或部分 # 添加依赖文件内容(仅读取存在的文件) for dep in dependency_files: dep_path = Path(dep) if not dep_path.exists(): alt_path = self.output_dir / dep if alt_path.exists(): dep_path = alt_path else: logger.warning(f"依赖文件不存在,已跳过: {dep}") self.console.print( f"[yellow]⚠ 依赖文件不存在,已跳过: {dep}[/yellow]" ) continue try: with open(dep_path, "r", encoding="utf-8") as f: content = f.read() context_content.append( f"### 文件: {dep_path.name} (路径: {dep}) ###\n{content}\n" ) except Exception as e: logger.error(f"读取依赖文件 {dep} 失败: {e}") self.console.print( f"[bold red]❌ 读取依赖文件 {dep} 失败: {e}[/bold red]" ) # 跳过此依赖文件 # 如果有现有内容,也加入上下文 if existing_content is not None: context_content.append( f"### 当前文件内容 ({file_path}) ###\n{existing_content}\n" ) full_context = "\n".join(context_content) # output_format 为 'full' 或其他,保持现有逻辑 if existing_content is not None: system_prompt = ( "你是一个专业的编程助手。根据用户指令和提供的上下文文件,**修改**现有的代码文件。" "返回严格的 JSON 对象,包含四个字段:\n" "- code: (string) 修改后的完整代码\n" "- description: (string) 简短的中文修改描述\n" "- commands: (array of string) 修改此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" "- output_format: (string) 应为 'full'" ) else: system_prompt = ( "你是一个专业的编程助手。根据用户指令和提供的上下文文件,生成完整的代码。" "返回严格的 JSON 对象,包含四个字段:\n" "- code: (string) 生成的完整代码\n" "- description: (string) 简短的中文功能描述\n" "- commands: (array of string) 生成此文件后需要执行的操作系统命令列表(如编译、安装依赖等),若无则返回空数组\n" "- output_format: (string) 应为 'full'" ) user_prompt = f"{prompt_instruction}\n\n参考文件上下文:\n{full_context}" try: result = self._call_llm(system_prompt, user_prompt) code = result.get("code") description = result.get("description", "") commands = result.get("commands", []) result.get("output_format", "full") if code is None: raise ValueError("LLM 响应中没有 code 字段") return code, description, commands except Exception as e: logger.error(f"生成文件 {file_path} 时调用LLM失败: {e}") self.console.print( f"[bold red]❌ 生成文件 {file_path} 时调用LLM失败: {e}[/bold red]" ) # 返回默认值以便继续 return "# 生成失败,请检查日志", "生成失败,发生错误", [] def _generate_file_task( self, file_path: str, dependencies: List[str], generated_files: set ) -> Tuple[bool, str]: """ 并发任务函数,用于生成单个文件 Args: file_path: 文件路径 dependencies: 依赖文件列表 generated_files: 已生成文件的集合(用于上下文) Returns: Tuple[bool, str]: (是否成功, 错误信息或空字符串) """ try: instruction = ( f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" ) # 过滤依赖文件,只使用已生成的 available_deps = [dep for dep in dependencies if dep in generated_files] code, desc, commands = self.generate_file( file_path, instruction, available_deps ) logger.info(f"生成完成: {file_path} - {desc}") # 写入文件 output_path = self.output_dir / file_path output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w", encoding="utf-8") as f: f.write(code) logger.info(f"已写入: {output_path}") # 执行命令 for cmd in commands: logger.info(f"准备执行命令: {cmd}") success = self.execute_command(cmd, cwd=self.output_dir) if not success: logger.warning(f"命令执行失败,但继续处理: {cmd}") return True, "" except Exception as e: logger.error(f"生成文件 {file_path} 失败: {e}") return False, str(e) def _topological_sort( self, files: List[str], dependencies: Dict[str, List[str]] ) -> List[str]: """ 对文件列表进行拓扑排序,基于依赖关系。 返回排序后的列表,满足每个文件的依赖项都出现在该文件之前。 如果检测到循环依赖,抛出ValueError。 """ from collections import deque # 初始化入度和反向邻接表 in_degree = {f: 0 for f in files} rev_graph = {f: [] for f in files} # 记录哪些文件依赖于f # 构建图:如果文件f依赖于dep,则增加f的入度,并将f加入rev_graph[dep] for f in files: for dep in dependencies.get(f, []): if dep in files: # 只考虑在files中的依赖 in_degree[f] += 1 # f依赖于dep,所以f的入度增加 rev_graph[dep].append(f) # dep被f依赖 # 队列初始化为入度为0的文件(无依赖的文件) queue = deque([f for f in files if in_degree[f] == 0]) sorted_files = [] while queue: node = queue.popleft() sorted_files.append(node) # 所有依赖于node的文件入度减1 for dependent in rev_graph[node]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) # 检查是否所有文件都已排序(无循环依赖) if len(sorted_files) != len(files): raise ValueError( f"检测到循环依赖,排序失败。已排序 {len(sorted_files)} 个文件,总共 {len(files)} 个文件。" ) return sorted_files def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: """ 执行单个命令,检查风险,失败仅记录错误不抛出异常 Returns: bool: 命令是否成功执行 """ dangerous, reason = is_dangerous_command(cmd) if dangerous: logger.error(f"危险命令被阻止: {cmd},原因: {reason}") self.console.print( f"[bold red]❌ 危险命令被阻止: {cmd},原因: {reason}[/bold red]" ) return False logger.info(f"执行命令: {cmd}") try: result = subprocess.run( cmd, shell=True, cwd=cwd or self.output_dir, capture_output=True, text=True, timeout=300, # 5分钟超时 ) logger.debug(f"命令返回码: {result.returncode}") if result.stdout: logger.debug(f"stdout: {result.stdout[:500]}") if result.stderr: logger.warning(f"stderr: {result.stderr[:500]}") if result.returncode != 0: logger.error(f"命令执行失败,返回码: {result.returncode}") self.console.print( f"[bold red]❌ 命令执行失败,返回码: {result.returncode}[/bold red]" ) return False return True except subprocess.TimeoutExpired: logger.error(f"命令执行超时: {cmd}") self.console.print(f"[bold red]❌ 命令执行超时: {cmd}[/bold red]") return False except Exception as e: logger.error(f"命令执行失败: {e}") self.console.print(f"[bold red]❌ 命令执行失败: {e}[/bold red]") return False def _analyze_issue(self, issue_content: str, issue_type: str) -> Dict[str, Any]: """ 调用 LLM 分析工单,返回结构化变更计划 """ system_prompt = ( "你是一个软件架构师。根据用户提供的工单内容和现有项目设计文件(design.json)," "分析需要进行的代码变更。返回严格的 JSON 对象,包含以下字段:\n" "- affected_files: 数组,每个元素为一个对象,包含:\n" " - path: 文件路径(相对于项目根目录)\n" " - action: 'create' 或 'modify'\n" " - description: 对此文件变更的简短描述\n" " - dependencies: 此文件可能依赖的其他文件路径列表(可选)\n" "- design_updates: 对象,描述对 design.json 的更新,例如新增的文件条目、修改的摘要等(可选)\n" "注意:仅返回 JSON,不要包含其他文本。" ) # 将现有 design.json 内容作为上下文的一部分 if not self.design: design_path = self.output_dir / "design.json" try: with open(design_path, "r", encoding="utf-8") as f: design_data = json.load(f) self.design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") raise e design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) user_prompt = ( f"工单类型: {issue_type}\n" f"工单内容:\n{issue_content}\n\n" f"现有设计文件 (design.json):\n{design_str}" ) result = self._call_llm(system_prompt, user_prompt, temperature=0.2) return result def _update_design( self, generated_files: List[str], design_updates: Dict[str, Any] ): """ 根据生成的变更更新 design.json 使用 FileModel 来处理文件信息 """ updated = False # 处理新增文件 for file_path in generated_files: # 检查文件是否已在 design.files 中 exists = any(f.path == file_path for f in self.design.files) if not exists: # 获取更新信息 update_info = design_updates.get(file_path, {}) # 创建新文件条目(FileModel实例) new_file = FileModel( path=file_path, summary=update_info.get("summary", "自动生成的新文件"), dependencies=update_info.get("dependencies", []), functions=update_info.get("functions", []), classes=update_info.get("classes", []), design_updates=update_info.get("design_updates", {}), ) self.design.files.append(new_file) updated = True logger.info(f"已将新文件 {file_path} 添加到 design.json") # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) # 这里可根据实际需求扩展,当前仅处理新增文件 if updated: # 保存更新后的 design.json design_path = self.output_dir / "design.json" with open(design_path, "w", encoding="utf-8") as f: json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) logger.info("design.json 已更新") def refresh_design(self) -> bool: """ 重新生成design.json,基于当前README内容或加载的design.json 返回bool表示是否成功 """ logger.info("开始刷新design.json") if not self.readme_content: # 尝试读取README.md文件 readme_path = self.output_dir / "README.md" if readme_path.exists(): try: self.readme_content = self.parse_readme(readme_path) except Exception as e: logger.error(f"读取README.md失败,无法刷新design: {e}") self.console.print( f"[bold red]❌ 读取README.md失败,无法刷新design: {e}[/bold red]" ) return False else: logger.error("没有README内容,且README.md文件不存在,无法刷新design") self.console.print( "[bold red]❌ 没有README内容,且README.md文件不存在,无法刷新design[/bold red]" ) return False try: self.design = self.generate_design_json() logger.info("design.json已成功重新生成") self.console.print("[green]✅ design.json已重新生成[/green]") return True except Exception as e: logger.error(f"重新生成design.json失败: {e}") self.console.print(f"[bold red]❌ 重新生成design.json失败: {e}[/bold red]") return False def update_file_entry(self, file_path: str, file_content: str) -> bool: """ 更新design.json中单个文件的条目,基于提供的文件内容 返回bool表示是否成功 """ logger.info(f"开始更新design.json中文件条目: {file_path}") if not self.design: # 加载现有design.json design_path = self.output_dir / "design.json" if not design_path.exists(): logger.error(f"design.json不存在于 {self.output_dir}") self.console.print( f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" ) return False try: with open(design_path, "r", encoding="utf-8") as f: design_data = json.load(f) self.design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") return False # 调用LLM分析文件内容,返回更新信息,增强以支持design_updates字段 system_prompt = ( "你是一个软件架构师。分析给定的文件内容,并返回对design.json中该文件条目的更新。" "返回严格的JSON对象,包含以下字段:\n" "- summary: 文件的新摘要\n" "- dependencies: 依赖文件列表\n" "- functions: 函数列表,每个对象有name, summary, inputs, outputs\n" "- classes: 类列表,每个对象有name, summary, methods\n" "- design_updates: 可选,设计更新字典\n" "注意:仅返回JSON,不要其他文本。" ) # 准备当前design.json中该文件的条目信息 current_entry = None for f in self.design.files: if f.path == file_path: current_entry = f.model_dump() break user_prompt = f"文件路径: {file_path}\n文件内容:\n{file_content}\n\n当前design.json中该文件的条目(如果存在):\n{json.dumps(current_entry, indent=2) if current_entry else '无'}" try: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) update_info = result # 查找或创建文件条目 file_model = None for f in self.design.files: if f.path == file_path: file_model = f break if file_model is None: # 创建新条目,包括design_updates new_file = FileModel( path=file_path, summary=update_info.get("summary", ""), dependencies=update_info.get("dependencies", []), functions=update_info.get("functions", []), classes=update_info.get("classes", []), design_updates=update_info.get("design_updates", {}), # 新增design_updates处理 ) self.design.files.append(new_file) logger.info(f"在design.json中创建了新文件条目: {file_path}") else: # 更新现有条目,使用merge_design_updates处理design_updates if 'design_updates' in update_info: file_model.merge_design_updates(update_info['design_updates']) # 更新其他字段 file_model.summary = update_info.get("summary", file_model.summary) file_model.dependencies = update_info.get( "dependencies", file_model.dependencies ) file_model.functions = update_info.get( "functions", file_model.functions ) file_model.classes = update_info.get("classes", file_model.classes) logger.info(f"更新了design.json中的文件条目: {file_path}") # 保存更新后的design.json design_path = self.output_dir / "design.json" with open(design_path, "w", encoding="utf-8") as f: json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) logger.info(f"design.json已更新,文件条目: {file_path}") self.console.print( f"[green]✅ design.json中文件条目 {file_path} 已更新[/green]" ) return True except Exception as e: logger.error(f"更新文件条目失败: {e}") self.console.print(f"[bold red]❌ 更新文件条目失败: {e}[/bold red]") return False def sync_readme(self) -> bool: """ 同步README.md和design.json,确保内容一致性 返回bool表示是否成功 """ logger.info("开始同步README.md和design.json") # 读取README.md readme_path = self.output_dir / "README.md" if not readme_path.exists(): logger.error(f"README.md不存在于 {self.output_dir}") self.console.print( f"[bold red]❌ README.md不存在于 {self.output_dir}[/bold red]" ) return False try: with open(readme_path, "r", encoding="utf-8") as f: readme_content = f.read() except Exception as e: logger.error(f"读取README.md失败: {e}") self.console.print(f"[bold red]❌ 读取README.md失败: {e}[/bold red]") return False # 加载design.json design_path = self.output_dir / "design.json" if not design_path.exists(): logger.error(f"design.json不存在于 {self.output_dir}") self.console.print( f"[bold red]❌ design.json不存在于 {self.output_dir}[/bold red]" ) return False try: with open(design_path, "r", encoding="utf-8") as f: design_data = json.load(f) design = DesignModel(**design_data) except Exception as e: logger.error(f"加载design.json失败: {e}") self.console.print(f"[bold red]❌ 加载design.json失败: {e}[/bold red]") return False # 调用LLM比较和同步 system_prompt = ( "你是一个软件架构师。比较README.md内容和design.json,识别不一致之处,并建议更新。" "返回严格的JSON对象,包含以下字段:\n" "- needs_update: bool, 是否需要更新\n" "- update_type: 'readme' 或 'design' 或 'both', 指示哪个需要更新\n" "- updates: 对象,描述具体的更新内容\n" "注意:仅返回JSON,不要其他文本。" ) user_prompt = f"README.md内容:\n{readme_content}\n\ndesign.json内容:\n{json.dumps(design.model_dump(), indent=2)}" try: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) needs_update = result.get("needs_update", False) if not needs_update: logger.info("README.md和design.json已同步,无需更新") self.console.print( "[green]✅ README.md和design.json已同步,无需更新[/green]" ) return True update_type = result.get("update_type", "") updates = result.get("updates", {}) if update_type == "readme": # 更新README.md new_readme = updates.get("new_readme", readme_content) with open(readme_path, "w", encoding="utf-8") as f: f.write(new_readme) logger.info("已更新README.md") self.console.print("[green]✅ README.md已更新[/green]") elif update_type == "design": # 更新design.json new_design_data = updates.get("new_design", design.model_dump()) design = DesignModel(**new_design_data) with open(design_path, "w", encoding="utf-8") as f: json.dump(new_design_data, f, indent=2, ensure_ascii=False) logger.info("已更新design.json") self.console.print("[green]✅ design.json已更新[/green]") elif update_type == "both": # 更新两者 new_readme = updates.get("new_readme", readme_content) new_design_data = updates.get("new_design", design.model_dump()) with open(readme_path, "w", encoding="utf-8") as f: f.write(new_readme) design = DesignModel(**new_design_data) with open(design_path, "w", encoding="utf-8") as f: json.dump(new_design_data, f, indent=2, ensure_ascii=False) logger.info("已同步更新README.md和design.json") self.console.print("[green]✅ README.md和design.json已同步更新[/green]") else: logger.warning(f"未知的update_type: {update_type}") self.console.print( f"[yellow]⚠ 未知的update_type: {update_type}[/yellow]" ) return False return True except Exception as e: logger.error(f"同步README.md失败: {e}") self.console.print(f"[bold red]❌ 同步README.md失败: {e}[/bold red]") return False ```