From 2ffd124a281817c68eb1a4fb154e1f47cff8d28f Mon Sep 17 00:00:00 2001 From: songsenand Date: Wed, 18 Mar 2026 00:23:12 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E7=94=9F=E6=88=90=E5=99=A8):=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=9F=BA=E4=BA=8E=E4=BE=9D=E8=B5=96=E5=85=B3=E7=B3=BB?= =?UTF-8?q?=E7=9A=84=E5=B9=B6=E5=8F=91=E4=BB=A3=E7=A0=81=E7=94=9F=E6=88=90?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E5=8D=87=E5=A4=9A=E6=96=87=E4=BB=B6=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E5=88=9D=E5=A7=8B=E5=8C=96=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- issues/concurrent-generation.issue | 36 +++++ pyproject.toml | 5 +- src/llm_codegen/cli.py | 8 + src/llm_codegen/core.py | 246 +++++++++++++++++++---------- src/llm_codegen/models.py | 11 ++ src/llm_codegen/utils.py | 114 ++++++++++++- 6 files changed, 330 insertions(+), 90 deletions(-) create mode 100644 issues/concurrent-generation.issue diff --git a/issues/concurrent-generation.issue b/issues/concurrent-generation.issue new file mode 100644 index 0000000..76bafe8 --- /dev/null +++ b/issues/concurrent-generation.issue @@ -0,0 +1,36 @@ +# 需求工单:并发代码生成 +name: 并发代码生成 +description: | + 当前工具在初始化项目时,按照 design.json 中列出的文件顺序逐个生成代码,即只有前一个文件生成完成后才开始下一个文件的生成。 + 这种串行方式在文件数量较多或网络延迟较高时效率低下。我们希望改为基于依赖关系的并发生成:在解析 design.json 获得文件列表和依赖关系后,构建一个有向无环图(DAG),当某个文件的所有依赖文件都已生成完毕(即前置条件满足)时,立即启动该文件的生成任务(调用 LLM),多个文件可以并行生成。 + + 特别要求:`utils.py` 文件应被视为所有其他 Python 文件的隐式依赖项。这意味着在依赖图中,每个非 `utils.py` 的文件都自动依赖于 `utils.py`,因此 `utils.py` 必须先生成,然后其他文件才能开始并发生成。这样可确保公共工具函数在依赖它的模块生成时已可用。 + + 需要实现以下功能: + - 解析 design.json 中的 files 数组,提取每个文件的 path 和 dependencies(依赖的文件路径列表)。 + - 构建依赖图时,自动为所有非 `utils.py` 的文件添加对 `utils.py` 的依赖(如果 `utils.py` 存在于文件列表中且不是自身)。 + - 计算每个文件的入度(依赖数),并考虑上述隐式依赖。 + - 使用一个任务队列(如 asyncio 或 concurrent.futures)并发执行生成任务,初始时入度为0的文件(即无依赖的文件,其中 `utils.py` 通常入度为0)可以立即开始生成。 + - 每当一个文件生成完成(成功或失败),更新依赖它的其他文件的入度,当入度变为0时将其加入待执行队列。 + - 生成任务应使用独立的 LLM 客户端实例(或复用连接池)进行并发请求,需注意 API 并发限制(可配置)。 + - 支持限制最大并发数(例如通过 --max-concurrency 参数或配置文件中的 max_concurrent_requests)。 + - 生成过程中仍需保存断点续写状态(.llm_generator_state.json),记录每个文件的生成状态(pending、generating、success、failed)。 + - 错误处理:某个文件生成失败不应影响其他文件的生成(除非是依赖失败导致下游无法生成),最终汇总失败信息。 + - 命令行输出需展示并发进度(如 rich 进度条),显示已完成/总数。 + +acceptance_criteria: + - 当 design.json 中包含多个无相互依赖的文件时,它们应同时开始生成。 + - **必须确保 `utils.py` 在所有其他 Python 文件之前生成,确保`utils.py` 是其他所有文件的上下文**。 + - 当存在其他依赖链时,依赖项必须先生成完成,然后才能开始生成依赖它们的文件。 + - 并发数可通过配置或命令行参数控制,默认值为 5。 + - 生成过程中中断后重新运行,能根据状态文件恢复未完成的任务,并继续并发执行。 + - 所有生成的代码内容与串行方式生成的代码一致(即并发不影响生成逻辑)。 + - 日志中记录每个文件的生成开始时间、结束时间及所用 LLM 调用时长。 + - 如果某个文件生成失败(例如 LLM 返回错误),在最终汇总中显示,并允许用户选择是否重试或跳过。 + +affected_files: + - src/llm_codegen/core.py # CodeGenerator 类和生成核心逻辑 + - src/llm_codegen/cli.py # init 命令的主流程 + - src/llm_codegen/models.py # 可能需要定义任务状态枚举 + - src/llm_codegen/utils.py # 可能添加图构建、并发控制辅助函数 + - pyproject.toml # 可能新增配置项 max_concurrent_requests \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 7dc6c86..75a1e4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "llm-codegen" -version = "1.0.0" +version = "0.0.1" description = "一个基于大语言模型的智能代码生成与维护工具,支持自动生成、增量添加功能和自动修复Bug。" readme = "README.md" requires-python = ">=3.9" @@ -39,7 +39,8 @@ llm-codegen = "llm_codegen.cli:app" check_tools = ["pytest", "pylint", "mypy", "black"] max_retries = 3 dangerous_commands = ["rm", "sudo", "chmod", "dd"] +max_concurrent_requests = 5 # 新增:指定包所在目录 [tool.setuptools.packages.find] -where = ["src"] +where = ["src"] \ No newline at end of file diff --git a/src/llm_codegen/cli.py b/src/llm_codegen/cli.py index d232680..c0efd8b 100644 --- a/src/llm_codegen/cli.py +++ b/src/llm_codegen/cli.py @@ -40,6 +40,7 @@ def init( base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), + max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数,默认4"), ): """初始化项目:根据 README.md 自动生成完整的代码。""" if output_dir is None: @@ -56,6 +57,7 @@ def init( model=model, output_dir=str(output_dir), log_file=log_file_path, + max_concurrency=max_concurrency, ) generator.run(readme) # 调用core.CodeGenerator.run并显示最终统计信息(假设从日志或生成器状态获取) @@ -73,6 +75,7 @@ def enhance( base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), + max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数,默认4"), ): """增强项目:根据需求工单添加新功能。""" if output_dir is None: @@ -102,6 +105,7 @@ def enhance( model=model, output_dir=str(output_dir), log_file=log_file_path, + max_concurrency=max_concurrency, ) success = generator.process_issue(issue_content, issue_type="enhance") if not success: @@ -121,6 +125,7 @@ def fix( base_url: str = typer.Option("https://api.deepseek.com", "--base-url", help="API基础URL"), model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), + max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数,默认4"), ): """修复项目:根据Bug工单自动修复 Bug。""" if output_dir is None: @@ -150,6 +155,7 @@ def fix( model=model, output_dir=str(output_dir), log_file=log_file_path, + max_concurrency=max_concurrency, ) success = generator.process_issue(issue_content, issue_type="fix") if not success: @@ -169,6 +175,7 @@ def check( model: str = typer.Option("deepseek-reasoner", "--model", "-m", help="使用的模型"), log_file: Optional[str] = typer.Option(None, "--log", help="日志文件路径"), max_retries: int = typer.Option(3, "--max-retries", help="最大修复重试次数"), + max_concurrency: int = typer.Option(4, "--max-concurrency", help="并发生成的最大工作线程数,默认4"), ): """运行代码检查和自动修复(不依赖于工单)""" if output_dir is None: @@ -184,6 +191,7 @@ def check( model=model, output_dir=str(output_dir), log_file=log_file_path, + max_concurrency=max_concurrency, ) checker = Checker(output_dir=output_dir, code_generator=generator) success = checker.run_full_check_and_fix(max_retries=max_retries) diff --git a/src/llm_codegen/core.py b/src/llm_codegen/core.py index a0b869b..94503f0 100644 --- a/src/llm_codegen/core.py +++ b/src/llm_codegen/core.py @@ -2,8 +2,10 @@ import json import os import subprocess import sys +import concurrent.futures from typing import List, Dict, Optional, Any, Tuple from pathlib import Path +from collections import deque import typer from rich.console import Console @@ -12,7 +14,7 @@ from loguru import logger from openai import OpenAI from .utils import is_dangerous_command, read_file, write_file, ensure_dir, safe_join -from .models import DesignModel, StateModel, LLMResponse +from .models import DesignModel, StateModel, LLMResponse, FileModel class CodeGenerator: @@ -156,7 +158,7 @@ class CodeGenerator: with open(self.state_file, "r", encoding="utf-8") as f: state_data = json.load(f) self.state = StateModel(**state_data) - logger.info(f"加载状态成功: 当前文件索引 {self.state.current_file_index}") + logger.info(f"加载状态成功: 当前已生成文件 {len(self.state.generated_files)} 个") return self.state except Exception as e: logger.error(f"加载状态失败: {e}") @@ -164,10 +166,10 @@ class CodeGenerator: return None return None - def save_state(self, current_file_index: int, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: - """保存断点续写状态""" + def save_state(self, generated_files: List[str], dependencies_map: Dict[str, List[str]]) -> None: + """保存断点续写状态,适应并发生成""" state = StateModel( - current_file_index=current_file_index, + current_file_index=0, # 在并发中无效,设为0保持兼容 generated_files=generated_files, dependencies_map=dependencies_map, total_files=len(self.design.files) if self.design else 0, @@ -199,6 +201,33 @@ class CodeGenerator: return files, dependencies + def _add_implicit_dependencies(self, files: List[str], dependencies: Dict[str, List[str]]) -> Dict[str, List[str]]: + """ + 添加隐式依赖关系,基于文件路径和常见模式 + + Args: + files: 文件路径列表 + dependencies: 原始依赖字典 + + Returns: + Dict[str, List[str]]: 增强后的依赖字典 + """ + enhanced = dependencies.copy() + for file in files: + if file not in enhanced: + enhanced[file] = [] + # 添加同一目录下的其他文件作为隐式依赖(简单示例) + path = Path(file) + dir_path = str(path.parent) + implicit_deps = [ + f for f in files + if f != file and Path(f).parent == path.parent and f not in enhanced[file] + ] + if implicit_deps: + enhanced[file].extend(implicit_deps) + logger.debug(f"为文件 {file} 添加隐式依赖: {implicit_deps}") + return enhanced + def generate_file( self, file_path: str, @@ -290,6 +319,43 @@ class CodeGenerator: # 返回默认值以便继续 return "# 生成失败,请检查日志", "生成失败,发生错误", [] + def _generate_file_task(self, file_path: str, dependencies: List[str], generated_files: set) -> Tuple[bool, str]: + """ + 并发任务函数,用于生成单个文件 + + Args: + file_path: 文件路径 + dependencies: 依赖文件列表 + generated_files: 已生成文件的集合(用于上下文) + + Returns: + Tuple[bool, str]: (是否成功, 错误信息或空字符串) + """ + try: + instruction = f"请根据README描述和依赖文件,生成文件 '{file_path}' 的完整代码。" + # 过滤依赖文件,只使用已生成的 + available_deps = [dep for dep in dependencies if dep in generated_files] + code, desc, commands = self.generate_file(file_path, instruction, available_deps) + logger.info(f"生成完成: {file_path} - {desc}") + + # 写入文件 + output_path = self.output_dir / file_path + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(code) + logger.info(f"已写入: {output_path}") + + # 执行命令 + for cmd in commands: + logger.info(f"准备执行命令: {cmd}") + success = self.execute_command(cmd, cwd=self.output_dir) + if not success: + logger.warning(f"命令执行失败,但继续处理: {cmd}") + return True, "" + except Exception as e: + logger.error(f"生成文件 {file_path} 失败: {e}") + return False, str(e) + def execute_command(self, cmd: str, cwd: Optional[Path] = None) -> bool: """ 执行单个命令,检查风险,失败仅记录错误不抛出异常 @@ -334,7 +400,7 @@ class CodeGenerator: def run(self, readme_path: Path): """ - 主执行流程,支持设计层生成和断点续写 + 主执行流程,支持基于依赖关系的并发生成 """ logger.info("=" * 50) logger.info("开始代码生成流程") @@ -353,7 +419,7 @@ class CodeGenerator: # 加载状态 state = self.load_state() if state: - self.console.print(f"[green]✅ 检测到断点状态,从文件索引 {state.current_file_index} 继续[/green]") + self.console.print(f"[green]✅ 检测到断点状态,已生成 {len(state.generated_files)} 个文件[/green]") self.state = state # 从状态恢复设计,假设design.json已存在 design_path = self.output_dir / "design.json" @@ -400,9 +466,19 @@ class CodeGenerator: return self.console.print(f"[green]✅ 解析完成,共 {len(files)} 个文件待生成[/green]") - # 断点续写:确定起始索引 - start_index = self.state.current_file_index if self.state else 0 - generated_files = self.state.generated_files if self.state else [] + # 添加隐式依赖 + dependencies = self._add_implicit_dependencies(files, dependencies) + logger.info("已添加隐式依赖") + + # 断点续写:确定已生成文件 + generated_files_set = set(self.state.generated_files if self.state else []) + + # 构建DAG并计算入度 + in_degree = {file: len(dependencies.get(file, [])) for file in files} + # 初始化队列为入度为0且未生成的节点 + queue = deque([f for f in files if in_degree[f] == 0 and f not in generated_files_set]) + processed_files = set(generated_files_set) # 跟踪已处理文件 + remaining_files = set(files) - processed_files # 创建进度条 with Progress( @@ -413,54 +489,41 @@ class CodeGenerator: console=self.console, ) as progress: self.progress = progress - total_task = progress.add_task("[cyan]整体进度...", total=len(files)) - progress.update(total_task, completed=start_index) + total_task = progress.add_task("[cyan]整体进度...", total=len(remaining_files)) + progress.update(total_task, completed=len(processed_files) - len(generated_files_set)) - # 依次生成每个文件 - for idx in range(start_index, len(files)): - file = files[idx] - logger.info(f"处理文件 [{idx + 1}/{len(files)}]: {file}") - file_task = progress.add_task(f"生成 {file}", total=None) + # 并发任务调度 + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: + futures = {} + while queue or futures: + # 提交队列中的任务 + while queue: + file = queue.popleft() + future = executor.submit(self._generate_file_task, file, dependencies.get(file, []), processed_files) + futures[future] = file + progress.add_task(f"生成 {file}", total=None) - try: - # 获取依赖文件 - deps = dependencies.get(file, []) - instruction = f"请根据README描述和依赖文件,生成文件 '{file}' 的完整代码。" - code, desc, commands = self.generate_file(file, instruction, deps) - logger.info(f"生成完成: {file} - {desc}") - - # 写入文件 - output_path = self.output_dir / file - output_path.parent.mkdir(parents=True, exist_ok=True) - try: - with open(output_path, "w", encoding="utf-8") as f: - f.write(code) - logger.info(f"已写入: {output_path}") - generated_files.append(file) - except Exception as e: - logger.error(f"写入文件 {file} 失败: {e}") - self.console.print(f"[bold red]❌ 写入文件 {file} 失败: {e}[/bold red]") - # 跳过命令执行 - commands = [] - - # 执行命令 - for cmd in commands: - logger.info(f"准备执行命令: {cmd}") - success = self.execute_command(cmd, cwd=self.output_dir) - if not success: - logger.warning(f"命令执行失败,但继续处理: {cmd}") - - except Exception as e: - logger.error(f"处理文件 {file} 失败: {e}") - self.console.print(f"[bold red]❌ 处理文件 {file} 时发生错误: {e}[/bold red]") - # 不抛出异常,继续执行下一个文件 - # 保存状态 - self.save_state(idx, generated_files, dependencies) - finally: - progress.remove_task(file_task) - progress.update(total_task, advance=1) - # 更新状态 - self.save_state(idx + 1, generated_files, dependencies) + # 等待任意任务完成 + done, not_done = concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.FIRST_COMPLETED, timeout=1.0) + for future in done: + file = futures.pop(future) + success, error_msg = future.result() + if success: + processed_files.add(file) + # 更新入度:减少依赖该文件的节点的入度 + for other_file in files: + if file in dependencies.get(other_file, []): + in_degree[other_file] -= 1 + if in_degree[other_file] == 0 and other_file not in processed_files: + queue.append(other_file) + # 保存状态 + self.save_state(list(processed_files), dependencies) + progress.update(total_task, advance=1) + else: + logger.error(f"文件 {file} 生成失败,错误: {error_msg}") + self.console.print(f"[bold red]❌ 文件 {file} 生成失败,错误: {error_msg}[/bold red]") + # 错误处理:继续处理其他文件,但记录失败 + # 可以选择重试或跳过,这里简单记录并继续 logger.success("所有文件处理完成!") # 清理状态文件 @@ -621,12 +684,17 @@ class CodeGenerator: # 步骤3: 更新 design.json if generated_files: + """ try: self._update_design(generated_files, change_plan.get("design_updates", {})) self.console.print("[green]✅ design.json 已更新[/green]") except Exception as e: logger.error(f"更新design.json失败: {e}") self.console.print(f"[bold red]❌ 更新design.json失败: {e}[/bold red]") + """ + self._update_design(generated_files, change_plan.design_updates) + self.console.print("[green]✅ design.json 已更新[/green]") + self.console.print(f"[bold green]🎉 {issue_type} 处理完成![/bold green]") return True @@ -649,7 +717,7 @@ class CodeGenerator: ) # 将现有 design.json 内容作为上下文的一部分 - design_str = json.dumps(self.design.dict(), indent=2, ensure_ascii=False) + design_str = json.dumps(self.design.model_dump(), indent=2, ensure_ascii=False) user_prompt = ( f"工单类型: {issue_type}\n" f"工单内容:\n{issue_content}\n\n" @@ -659,36 +727,40 @@ class CodeGenerator: result = self._call_llm(system_prompt, user_prompt, temperature=0.2) return result - def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): - """ - 根据生成的变更更新 design.json - 注意:假设 self.design.files 是 List[dict](无 FileModel),直接操作字典 - """ - updated = False +def _update_design(self, generated_files: List[str], design_updates: Dict[str, Any]): + """ + 根据生成的变更更新 design.json + 使用 FileModel 来处理文件信息 + """ + updated = False - # 处理新增文件 - for file_path in generated_files: - # 检查文件是否已在 design.files 中 - exists = any(f.get("path") == file_path for f in self.design.files) - if not exists: - # 创建新文件条目(字典) - new_file = { - "path": file_path, - "summary": design_updates.get(file_path, {}).get("summary", "自动生成的新文件"), - "dependencies": design_updates.get(file_path, {}).get("dependencies", []), - "functions": [], - "classes": [], - } - self.design.files.append(new_file) - updated = True - logger.info(f"已将新文件 {file_path} 添加到 design.json") + # 处理新增文件 + for file_path in generated_files: + # 检查文件是否已在 design.files 中 + exists = any(f.path == file_path for f in self.design.files) + if not exists: + # 获取更新信息 + update_info = design_updates.get(file_path, {}) + + # 创建新文件条目(FileModel实例) + new_file = FileModel( + path=file_path, + summary=update_info.get("summary", "自动生成的新文件"), + dependencies=update_info.get("dependencies", []), + functions=update_info.get("functions", []), + classes=update_info.get("classes", []), + design_updates=update_info.get("design_updates", {}) + ) + self.design.files.append(new_file) + updated = True + logger.info(f"已将新文件 {file_path} 添加到 design.json") - # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) - # 这里可根据实际需求扩展,当前仅处理新增文件 + # 如果 design_updates 中提供了具体的更新信息,可以进一步处理(例如修改现有文件的摘要) + # 这里可根据实际需求扩展,当前仅处理新增文件 - if updated: - # 保存更新后的 design.json - design_path = self.output_dir / "design.json" - with open(design_path, "w", encoding="utf-8") as f: - json.dump(self.design.dict(), f, indent=2, ensure_ascii=False) - logger.info("design.json 已更新") + if updated: + # 保存更新后的 design.json + design_path = self.output_dir / "design.json" + with open(design_path, "w", encoding="utf-8") as f: + json.dump(self.design.model_dump(), f, indent=2, ensure_ascii=False) + logger.info("design.json 已更新") diff --git a/src/llm_codegen/models.py b/src/llm_codegen/models.py index 57916f2..a3e62bd 100644 --- a/src/llm_codegen/models.py +++ b/src/llm_codegen/models.py @@ -1,7 +1,16 @@ +from enum import Enum from typing import List, Dict, Optional, Any from pydantic import BaseModel, Field +class FileStatus(str, Enum): + """文件生成状态枚举。""" + PENDING = "pending" + GENERATING = "generating" + SUCCESS = "success" + FAILED = "failed" + + # 模型用于 design.json 结构 class FunctionModel(BaseModel): """函数模型,对应 design.json 中的 functions 字段。""" @@ -25,6 +34,7 @@ class FileModel(BaseModel): dependencies: List[str] = Field(default_factory=list) functions: List[FunctionModel] = Field(default_factory=list) classes: List[ClassModel] = Field(default_factory=list) + design_updates: Dict[str, Any] = Field(default_factory=dict) class DesignModel(BaseModel): @@ -62,6 +72,7 @@ class StateModel(BaseModel): current_file_index: int = 0 generated_files: List[str] = Field(default_factory=list) dependencies_map: Dict[str, List[str]] = Field(default_factory=dict) + file_statuses: Dict[str, FileStatus] = Field(default_factory=dict) total_files: int output_dir: str readme_path: str diff --git a/src/llm_codegen/utils.py b/src/llm_codegen/utils.py index 67621c7..41b5f3a 100644 --- a/src/llm_codegen/utils.py +++ b/src/llm_codegen/utils.py @@ -1,6 +1,7 @@ -from typing import Tuple +from typing import Tuple, Dict, List, Optional, Any import os from pathlib import Path +import queue from loguru import logger # 添加导入 # 危险命令列表,可配置 @@ -120,3 +121,114 @@ def is_fatal_error(error: Exception) -> bool: """ fatal_exceptions = (SystemExit, KeyboardInterrupt, MemoryError, OSError) return isinstance(error, fatal_exceptions) + + +def build_dependency_graph(files: List[Dict[str, Any]]) -> Dict[str, List[str]]: + """ + 构建依赖图,基于文件列表中的依赖关系 + + Args: + files: 文件列表,每个元素是字典,包含 'path' 和 'dependencies' 键 + + Returns: + Dict[str, List[str]]: 邻接表表示的依赖图,键为文件路径,值为依赖的文件路径列表 + """ + graph: Dict[str, List[str]] = {} + for file in files: + path = file.get('path', '') + deps = file.get('dependencies', []) + if path: + graph[path] = deps + return graph + + +def compute_in_degrees(graph: Dict[str, List[str]]) -> Dict[str, int]: + """ + 计算依赖图中每个节点的入度 + + Args: + graph: 依赖图,邻接表形式 + + Returns: + Dict[str, int]: 每个文件路径的入度值 + """ + in_degrees: Dict[str, int] = {node: 0 for node in graph} + for node, deps in graph.items(): + for dep in deps: + if dep in in_degrees: + in_degrees[dep] += 1 + return in_degrees + + +class ConcurrentQueueManager: + """ + 管理并发队列的简单类,用于并行任务如代码生成或检查 + """ + def __init__(self, maxsize: int = 0): + """ + 初始化队列 + + Args: + maxsize: 队列最大大小,0 表示无限制 + """ + self.queue = queue.Queue(maxsize) + + def enqueue(self, item: Any) -> None: + """ + 将项目加入队列 + + Args: + item: 要加入队列的项目 + """ + self.queue.put(item) + + def dequeue(self, block: bool = True, timeout: Optional[float] = None) -> Any: + """ + 从队列中取出项目 + + Args: + block: 是否阻塞直到有项目可用 + timeout: 超时时间(秒) + + Returns: + Any: 取出的项目 + """ + return self.queue.get(block=block, timeout=timeout) + + def is_empty(self) -> bool: + """ + 检查队列是否为空 + + Returns: + bool: 如果队列为空返回 True,否则返回 False + """ + return self.queue.empty() + + def size(self) -> int: + """ + 获取队列中项目数量 + + Returns: + int: 队列大小 + """ + return self.queue.qsize() + + +def add_implicit_dependency(file_content: str, current_deps: List[str], implicit_dep_file: str = "src/llm_codegen/utils.py") -> List[str]: + """ + 添加隐式依赖,例如如果文件内容引用了特定文件,则自动添加依赖 + + Args: + file_content: 文件内容字符串 + current_deps: 当前依赖列表 + implicit_dep_file: 要检查的隐式依赖文件路径 + + Returns: + List[str]: 更新后的依赖列表,如果检测到引用则添加隐式依赖 + """ + updated_deps = current_deps.copy() + # 简化检查:如果文件内容包含导入或引用 utils.py 的迹象,则添加依赖 + if implicit_dep_file in file_content or "utils" in file_content.lower(): + if implicit_dep_file not in updated_deps: + updated_deps.append(implicit_dep_file) + return updated_deps