Cleaning Data Returned by Report Engine's LLM

This commit is contained in:
马一丁
2025-11-17 15:39:02 +08:00
parent 26c133c998
commit 50b6ab403e
6 changed files with 985 additions and 64 deletions

View File

@@ -14,6 +14,7 @@ from ..prompts import (
SYSTEM_PROMPT_DOCUMENT_LAYOUT,
build_document_layout_prompt,
)
from ..utils.json_parser import RobustJSONParser, JSONParseError
from .base_node import BaseNode
@@ -27,6 +28,12 @@ class DocumentLayoutNode(BaseNode):
def __init__(self, llm_client):
"""记录LLM客户端并设置节点名字供BaseNode日志使用"""
super().__init__(llm_client, "DocumentLayoutNode")
# 初始化鲁棒JSON解析器启用所有修复策略
self.json_parser = RobustJSONParser(
enable_json_repair=True,
enable_llm_repair=False, # 可以根据需要启用LLM修复
max_repair_attempts=3,
)
def run(
self,
@@ -82,8 +89,14 @@ class DocumentLayoutNode(BaseNode):
"""
解析LLM返回的JSON文本若失败则抛出友好错误。
使用鲁棒JSON解析器进行多重修复尝试
1. 清理markdown标记和思考内容
2. 本地语法修复(括号平衡、逗号补全、控制字符转义等)
3. 使用json_repair库进行高级修复
4. 可选的LLM辅助修复
参数:
raw: LLM原始返回字符串允许带```包裹。
raw: LLM原始返回字符串允许带```包裹、思考内容等
返回:
dict: 结构化的设计稿。
@@ -91,19 +104,25 @@ class DocumentLayoutNode(BaseNode):
异常:
ValueError: 当响应为空或JSON解析失败时抛出。
"""
cleaned = raw.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
if cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
if not cleaned:
raise ValueError("文档设计LLM返回空内容")
try:
return json.loads(cleaned)
except json.JSONDecodeError as exc:
result = self.json_parser.parse(
raw,
context_name="文档设计",
expected_keys=["title", "toc", "hero"],
)
# 验证关键字段的类型
if not isinstance(result.get("title"), str):
logger.warning("文档设计缺少title字段或类型错误使用默认值")
result.setdefault("title", "未命名报告")
if not isinstance(result.get("toc"), (list, dict)):
logger.warning("文档设计缺少toc字段或类型错误使用空列表")
result.setdefault("toc", [])
if not isinstance(result.get("hero"), dict):
logger.warning("文档设计缺少hero字段或类型错误使用空对象")
result.setdefault("hero", {})
return result
except JSONParseError as exc:
# 转换为原有的异常类型以保持向后兼容
raise ValueError(f"文档设计JSON解析失败: {exc}") from exc

View File

@@ -12,6 +12,7 @@ from loguru import logger
from .base_node import BaseNode
from ..prompts import SYSTEM_PROMPT_TEMPLATE_SELECTION
from ..utils.json_parser import RobustJSONParser, JSONParseError
class TemplateSelectionNode(BaseNode):
@@ -25,13 +26,19 @@ class TemplateSelectionNode(BaseNode):
def __init__(self, llm_client, template_dir: str = "ReportEngine/report_template"):
"""
初始化模板选择节点
Args:
llm_client: LLM客户端
template_dir: 模板目录路径
"""
super().__init__(llm_client, "TemplateSelectionNode")
self.template_dir = template_dir
# 初始化鲁棒JSON解析器启用所有修复策略
self.json_parser = RobustJSONParser(
enable_json_repair=True,
enable_llm_repair=False,
max_repair_attempts=3,
)
def run(self, input_data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
"""
@@ -137,20 +144,22 @@ class TemplateSelectionNode(BaseNode):
# 调用LLM
response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_TEMPLATE_SELECTION, user_message)
# 检查响应是否为空
if not response or not response.strip():
logger.error("LLM返回空响应")
return None
logger.info(f"LLM原始响应: {response}")
# 尝试解析JSON响应
# 尝试解析JSON响应,使用鲁棒解析器
try:
# 清理响应文本
cleaned_response = self._clean_llm_response(response)
result = json.loads(cleaned_response)
result = self.json_parser.parse(
response,
context_name="模板选择",
expected_keys=["template_name", "selection_reason"],
)
# 验证选择的模板是否存在
selected_template_name = result.get('template_name', '')
for template in available_templates:
@@ -161,38 +170,16 @@ class TemplateSelectionNode(BaseNode):
'template_content': template['content'],
'selection_reason': result.get('selection_reason', 'LLM智能选择')
}
logger.error(f"LLM选择的模板不存在: {selected_template_name}")
return None
except json.JSONDecodeError as e:
except JSONParseError as e:
logger.error(f"JSON解析失败: {str(e)}")
# 尝试从文本响应中提取模板信息
return self._extract_template_from_text(response, available_templates)
def _clean_llm_response(self, response: str) -> str:
"""
清理LLM响应。
去掉 ```json``` 包裹以及前后空白,方便 `json.loads`。
参数:
response: LLM原始响应。
返回:
str: 适合直接做JSON解析的纯文本。
"""
# 移除可能的markdown代码块标记
if '```json' in response:
response = response.split('```json')[1].split('```')[0]
elif '```' in response:
response = response.split('```')[1].split('```')[0]
# 移除前后空白
response = response.strip()
return response
def _extract_template_from_text(self, response: str, available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
从文本响应中提取模板信息。

View File

@@ -14,6 +14,7 @@ from ..prompts import (
SYSTEM_PROMPT_WORD_BUDGET,
build_word_budget_prompt,
)
from ..utils.json_parser import RobustJSONParser, JSONParseError
from .base_node import BaseNode
@@ -27,6 +28,12 @@ class WordBudgetNode(BaseNode):
def __init__(self, llm_client):
"""仅记录LLM客户端引用方便run阶段发起请求"""
super().__init__(llm_client, "WordBudgetNode")
# 初始化鲁棒JSON解析器启用所有修复策略
self.json_parser = RobustJSONParser(
enable_json_repair=True,
enable_llm_repair=False, # 可以根据需要启用LLM修复
max_repair_attempts=3,
)
def run(
self,
@@ -79,8 +86,14 @@ class WordBudgetNode(BaseNode):
"""
将LLM输出的JSON文本转为字典失败时提示规划异常。
使用鲁棒JSON解析器进行多重修复尝试
1. 清理markdown标记和思考内容
2. 本地语法修复(括号平衡、逗号补全、控制字符转义等)
3. 使用json_repair库进行高级修复
4. 可选的LLM辅助修复
参数:
raw: LLM返回值可能包含```包裹。
raw: LLM返回值可能包含```包裹、思考内容等
返回:
dict: 合法的篇幅规划JSON。
@@ -88,19 +101,25 @@ class WordBudgetNode(BaseNode):
异常:
ValueError: 当响应为空或JSON解析失败时抛出。
"""
cleaned = raw.strip()
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
if cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
if not cleaned:
raise ValueError("篇幅规划LLM返回空内容")
try:
return json.loads(cleaned)
except json.JSONDecodeError as exc:
result = self.json_parser.parse(
raw,
context_name="篇幅规划",
expected_keys=["totalWords", "globalGuidelines", "chapters"],
)
# 验证关键字段的类型
if not isinstance(result.get("totalWords"), (int, float)):
logger.warning("篇幅规划缺少totalWords字段或类型错误使用默认值")
result.setdefault("totalWords", 10000)
if not isinstance(result.get("globalGuidelines"), list):
logger.warning("篇幅规划缺少globalGuidelines字段或类型错误使用空列表")
result.setdefault("globalGuidelines", [])
if not isinstance(result.get("chapters"), (list, dict)):
logger.warning("篇幅规划缺少chapters字段或类型错误使用空列表")
result.setdefault("chapters", [])
return result
except JSONParseError as exc:
# 转换为原有的异常类型以保持向后兼容
raise ValueError(f"篇幅规划JSON解析失败: {exc}") from exc

View File

@@ -216,8 +216,17 @@ SYSTEM_PROMPT_TEMPLATE_SELECTION = f"""
{json.dumps(output_schema_template_selection, indent=2, ensure_ascii=False)}
</OUTPUT JSON SCHEMA>
确保输出是一个符合上述输出JSON模式定义的JSON对象。
只返回JSON对象不要有解释或额外文本。
**重要的输出格式要求:**
1. 只返回符合上述Schema的纯JSON对象
2. 严禁在JSON外添加任何思考过程、说明文字或解释
3. 可以使用```json和```标记包裹JSON但不要添加其他内容
4. 确保JSON语法完全正确
- 对象和数组元素之间必须有逗号分隔
- 字符串中的特殊字符必须正确转义(\n, \t, \"等)
- 括号必须成对且正确嵌套
- 不要使用尾随逗号(最后一个元素后不加逗号)
- 不要在JSON中添加注释
5. 所有字符串值使用双引号,数值不使用引号
"""
# HTML报告生成的系统提示词
@@ -372,7 +381,17 @@ SYSTEM_PROMPT_DOCUMENT_LAYOUT = f"""
{json.dumps(document_layout_output_schema, ensure_ascii=False, indent=2)}
</OUTPUT JSON SCHEMA>
只返回JSON勿附加额外文本。
**重要的输出格式要求:**
1. 只返回符合上述Schema的纯JSON对象
2. 严禁在JSON外添加任何思考过程、说明文字或解释
3. 可以使用```json和```标记包裹JSON但不要添加其他内容
4. 确保JSON语法完全正确
- 对象和数组元素之间必须有逗号分隔
- 字符串中的特殊字符必须正确转义(\n, \t, \"等)
- 括号必须成对且正确嵌套
- 不要使用尾随逗号(最后一个元素后不加逗号)
- 不要在JSON中添加注释
5. 所有字符串值使用双引号,数值不使用引号
"""
# 篇幅规划提示词
@@ -390,7 +409,17 @@ SYSTEM_PROMPT_WORD_BUDGET = f"""
{json.dumps(word_budget_output_schema, ensure_ascii=False, indent=2)}
</OUTPUT JSON SCHEMA>
只返回JSON无额外说明。
**重要的输出格式要求:**
1. 只返回符合上述Schema的纯JSON对象
2. 严禁在JSON外添加任何思考过程、说明文字或解释
3. 可以使用```json和```标记包裹JSON但不要添加其他内容
4. 确保JSON语法完全正确
- 对象和数组元素之间必须有逗号分隔
- 字符串中的特殊字符必须正确转义(\n, \t, \"等)
- 括号必须成对且正确嵌套
- 不要使用尾随逗号(最后一个元素后不加逗号)
- 不要在JSON中添加注释
5. 所有字符串值使用双引号,数值不使用引号
"""

View File

@@ -0,0 +1,632 @@
"""
统一的JSON解析和修复工具。
提供鲁棒的JSON解析能力支持
1. 自动清理markdown代码块标记和思考内容
2. 本地语法修复(括号平衡、逗号补全、控制字符转义等)
3. 使用json_repair库进行高级修复
4. LLM辅助修复可选
5. 详细的错误日志和调试信息
"""
from __future__ import annotations
import json
import re
from typing import Any, Dict, List, Optional, Tuple, Callable
from loguru import logger
try:
from json_repair import repair_json as _json_repair_fn
except ImportError:
_json_repair_fn = None
class JSONParseError(ValueError):
"""JSON解析失败时抛出的异常附带原始文本方便排查。"""
def __init__(self, message: str, raw_text: Optional[str] = None):
"""
构造异常并附加原始输出,便于日志中定位。
Args:
message: 人类可读的错误描述。
raw_text: 触发异常的完整LLM输出。
"""
super().__init__(message)
self.raw_text = raw_text
class RobustJSONParser:
"""
鲁棒的JSON解析器。
集成多种修复策略确保LLM返回的内容能够被正确解析
- 清理markdown包裹、思考内容等额外信息
- 修复常见语法错误(缺少逗号、括号不平衡等)
- 转义未转义的控制字符
- 使用第三方库进行高级修复
- 可选的LLM辅助修复
"""
# 常见的LLM思考内容模式
_THINKING_PATTERNS = [
r"<thinking>.*?</thinking>",
r"<thought>.*?</thought>",
r"让我想想.*?(?=\{|\[|$)",
r"首先.*?(?=\{|\[|$)",
r"分析.*?(?=\{|\[|$)",
r"根据.*?(?=\{|\[|$)",
]
# 冒号等号模式LLM常见错误
_COLON_EQUALS_PATTERN = re.compile(r'(":\s*)=')
def __init__(
self,
llm_repair_fn: Optional[Callable[[str, str], Optional[str]]] = None,
enable_json_repair: bool = True,
enable_llm_repair: bool = False,
max_repair_attempts: int = 3,
):
"""
初始化JSON解析器。
Args:
llm_repair_fn: 可选的LLM修复函数接收(原始JSON, 错误信息)返回修复后的JSON
enable_json_repair: 是否启用json_repair库
enable_llm_repair: 是否启用LLM辅助修复
max_repair_attempts: 最大修复尝试次数
"""
self.llm_repair_fn = llm_repair_fn
self.enable_json_repair = enable_json_repair and _json_repair_fn is not None
self.enable_llm_repair = enable_llm_repair
self.max_repair_attempts = max_repair_attempts
def parse(
self,
raw_text: str,
context_name: str = "JSON",
expected_keys: Optional[List[str]] = None,
extract_wrapper_key: Optional[str] = None,
) -> Dict[str, Any]:
"""
解析LLM返回的JSON文本。
参数:
raw_text: LLM原始输出可能包含```包裹、思考内容等)
context_name: 上下文名称,用于错误信息
expected_keys: 期望的键列表,用于验证
extract_wrapper_key: 如果JSON被包裹在某个键中指定该键名进行提取
返回:
dict: 解析后的JSON对象
异常:
JSONParseError: 多种修复策略仍无法解析合法JSON
"""
if not raw_text or not raw_text.strip():
raise JSONParseError(f"{context_name}返回空内容")
# 步骤1: 清理markdown标记和思考内容
cleaned = self._clean_response(raw_text)
# 步骤2: 收集候选payload
candidates = [cleaned]
# 步骤3: 应用本地修复策略
local_repaired = self._apply_local_repairs(cleaned)
if local_repaired != cleaned:
candidates.append(local_repaired)
# 步骤4: 尝试解析所有候选
last_error: Optional[json.JSONDecodeError] = None
for i, candidate in enumerate(candidates):
try:
data = json.loads(candidate)
logger.debug(f"{context_name} JSON解析成功候选{i + 1}/{len(candidates)}")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.debug(f"{context_name} 候选{i + 1}解析失败: {exc}")
# 步骤5: 使用json_repair库
if self.enable_json_repair:
repaired = self._attempt_json_repair(cleaned, context_name)
if repaired:
try:
data = json.loads(repaired)
logger.info(f"{context_name} JSON通过json_repair库修复成功")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.debug(f"{context_name} json_repair修复后仍无法解析: {exc}")
# 步骤6: 使用LLM修复如果启用
if self.enable_llm_repair and self.llm_repair_fn:
llm_repaired = self._attempt_llm_repair(cleaned, str(last_error), context_name)
if llm_repaired:
try:
data = json.loads(llm_repaired)
logger.info(f"{context_name} JSON通过LLM修复成功")
return self._extract_and_validate(
data, expected_keys, extract_wrapper_key, context_name
)
except json.JSONDecodeError as exc:
last_error = exc
logger.warning(f"{context_name} LLM修复后仍无法解析: {exc}")
# 所有策略都失败了
error_msg = f"{context_name} JSON解析失败: {last_error}"
logger.error(error_msg)
logger.debug(f"原始文本前500字符: {raw_text[:500]}")
raise JSONParseError(error_msg, raw_text=raw_text) from last_error
def _clean_response(self, raw: str) -> str:
"""
清理LLM响应去除markdown标记和思考内容。
参数:
raw: LLM原始输出
返回:
str: 清理后的文本
"""
cleaned = raw.strip()
# 移除思考内容(多语言支持)
for pattern in self._THINKING_PATTERNS:
cleaned = re.sub(pattern, "", cleaned, flags=re.DOTALL | re.IGNORECASE)
# 移除markdown代码块标记
if cleaned.startswith("```json"):
cleaned = cleaned[7:]
elif cleaned.startswith("```"):
cleaned = cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
# 尝试提取第一个完整的JSON对象或数组
cleaned = self._extract_first_json_structure(cleaned)
return cleaned
def _extract_first_json_structure(self, text: str) -> str:
"""
从文本中提取第一个完整的JSON对象或数组。
这对于处理LLM在JSON前后添加说明文字的情况很有用。
参数:
text: 可能包含JSON的文本
返回:
str: 提取的JSON文本如果找不到则返回原文本
"""
# 查找第一个 { 或 [
start_brace = text.find("{")
start_bracket = text.find("[")
if start_brace == -1 and start_bracket == -1:
return text
# 确定起始位置
if start_brace == -1:
start = start_bracket
opener = "["
closer = "]"
elif start_bracket == -1:
start = start_brace
opener = "{"
closer = "}"
else:
start = min(start_brace, start_bracket)
opener = text[start]
closer = "}" if opener == "{" else "]"
# 查找对应的结束位置
depth = 0
in_string = False
escaped = False
for i in range(start, len(text)):
ch = text[i]
if escaped:
escaped = False
continue
if ch == "\\":
escaped = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch in "{[":
depth += 1
elif ch in "}]":
depth -= 1
if depth == 0:
return text[start : i + 1]
# 如果没找到完整的结构,返回从起始位置到结尾
return text[start:] if start < len(text) else text
def _apply_local_repairs(self, text: str) -> str:
"""
应用本地修复策略。
参数:
text: 原始JSON文本
返回:
str: 修复后的文本
"""
repaired = text
mutated = False
# 修复 ":=" 错误
new_text = self._COLON_EQUALS_PATTERN.sub(r"\1", repaired)
if new_text != repaired:
logger.warning("检测到\":=\"字符,已自动移除多余的'='")
repaired = new_text
mutated = True
# 转义控制字符
repaired, escaped = self._escape_control_characters(repaired)
if escaped:
logger.warning("检测到未转义的控制字符,已自动转换为转义序列")
mutated = True
# 修复缺少的逗号
repaired, commas_fixed = self._fix_missing_commas(repaired)
if commas_fixed:
logger.warning("检测到对象/数组之间缺少逗号,已自动补齐")
mutated = True
# 平衡括号
repaired, balanced = self._balance_brackets(repaired)
if balanced:
logger.warning("检测到括号不平衡,已自动补齐/剔除异常括号")
mutated = True
# 移除尾随逗号
repaired, trailing_removed = self._remove_trailing_commas(repaired)
if trailing_removed:
logger.warning("检测到尾随逗号,已自动移除")
mutated = True
return repaired if mutated else text
def _escape_control_characters(self, text: str) -> Tuple[str, bool]:
"""
将字符串字面量中的裸换行/制表符/控制字符替换为JSON合法的转义序列。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
result: List[str] = []
in_string = False
escaped = False
mutated = False
control_map = {"\n": "\\n", "\r": "\\r", "\t": "\\t"}
for ch in text:
if escaped:
result.append(ch)
escaped = False
continue
if ch == "\\":
result.append(ch)
escaped = True
continue
if ch == '"':
result.append(ch)
in_string = not in_string
continue
if in_string and ch in control_map:
result.append(control_map[ch])
mutated = True
continue
if in_string and ord(ch) < 0x20:
result.append(f"\\u{ord(ch):04x}")
mutated = True
continue
result.append(ch)
return "".join(result), mutated
def _fix_missing_commas(self, text: str) -> Tuple[str, bool]:
"""
在对象/数组元素之间自动补逗号。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
chars: List[str] = []
mutated = False
in_string = False
escaped = False
length = len(text)
i = 0
while i < length:
ch = text[i]
chars.append(ch)
if escaped:
escaped = False
i += 1
continue
if ch == "\\":
escaped = True
i += 1
continue
if ch == '"':
# 如果我们正在退出字符串,检查后面是否需要逗号
if in_string:
# 查找下一个非空白字符
j = i + 1
while j < length and text[j] in " \t\r\n":
j += 1
# 如果下一个字符是 " { [ 或数字,可能需要逗号
if j < length:
next_ch = text[j]
if next_ch in "\"[{" or next_ch.isdigit():
# 检查是否已经在对象或数组中
# 通过检查前面是否有未闭合的 { 或 [
has_opener = False
for k in range(len(chars) - 1, -1, -1):
if chars[k] in "{[":
has_opener = True
break
elif chars[k] in "]}":
break
if has_opener:
chars.append(",")
mutated = True
in_string = not in_string
i += 1
continue
# 在 } 或 ] 后面检查是否需要逗号
if not in_string and ch in "}]":
j = i + 1
# 跳过空白
while j < length and text[j] in " \t\r\n":
j += 1
# 如果下一个非空白字符是 { [ " 或数字,添加逗号
if j < length:
next_ch = text[j]
if next_ch in "{[\"" or next_ch.isdigit():
chars.append(",")
mutated = True
i += 1
return "".join(chars), mutated
def _balance_brackets(self, text: str) -> Tuple[str, bool]:
"""
尝试修复因LLM多写/少写括号导致的不平衡结构。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
result: List[str] = []
stack: List[str] = []
mutated = False
in_string = False
escaped = False
opener_map = {"{": "}", "[": "]"}
for ch in text:
if escaped:
result.append(ch)
escaped = False
continue
if ch == "\\":
result.append(ch)
escaped = True
continue
if ch == '"':
result.append(ch)
in_string = not in_string
continue
if in_string:
result.append(ch)
continue
if ch in "{[":
stack.append(ch)
result.append(ch)
continue
if ch in "}]":
if stack and (
(ch == "}" and stack[-1] == "{") or (ch == "]" and stack[-1] == "[")
):
stack.pop()
result.append(ch)
else:
# 不匹配的闭括号,忽略
mutated = True
continue
result.append(ch)
# 补齐未闭合的括号
while stack:
opener = stack.pop()
result.append(opener_map[opener])
mutated = True
return "".join(result), mutated
def _remove_trailing_commas(self, text: str) -> Tuple[str, bool]:
"""
移除JSON对象和数组中的尾随逗号。
参数:
text: 原始JSON文本
返回:
Tuple[str, bool]: (修复后的文本, 是否有修改)
"""
if not text:
return text, False
# 使用正则表达式移除尾随逗号
# 匹配 , 后面跟着空白和 } 或 ] 的情况
pattern = r",(\s*[}\]])"
new_text = re.sub(pattern, r"\1", text)
return new_text, new_text != text
def _attempt_json_repair(self, text: str, context_name: str) -> Optional[str]:
"""
使用json_repair库进行高级修复。
参数:
text: 原始JSON文本
context_name: 上下文名称
返回:
Optional[str]: 修复后的JSON文本失败返回None
"""
if not _json_repair_fn:
return None
try:
fixed = _json_repair_fn(text)
if fixed and fixed != text:
logger.info(f"{context_name} 使用json_repair库自动修复JSON")
return fixed
except Exception as exc:
logger.debug(f"{context_name} json_repair修复失败: {exc}")
return None
def _attempt_llm_repair(
self, text: str, error_msg: str, context_name: str
) -> Optional[str]:
"""
使用LLM进行JSON修复。
参数:
text: 原始JSON文本
error_msg: 解析错误信息
context_name: 上下文名称
返回:
Optional[str]: 修复后的JSON文本失败返回None
"""
if not self.llm_repair_fn:
return None
try:
logger.info(f"{context_name} 尝试使用LLM修复JSON")
repaired = self.llm_repair_fn(text, error_msg)
if repaired and repaired != text:
return repaired
except Exception as exc:
logger.warning(f"{context_name} LLM修复失败: {exc}")
return None
def _extract_and_validate(
self,
data: Any,
expected_keys: Optional[List[str]],
extract_wrapper_key: Optional[str],
context_name: str,
) -> Dict[str, Any]:
"""
提取并验证JSON数据。
参数:
data: 解析后的数据
expected_keys: 期望的键列表
extract_wrapper_key: 包裹键名
context_name: 上下文名称
返回:
Dict[str, Any]: 提取并验证后的数据
异常:
JSONParseError: 如果数据格式不符合预期
"""
# 提取包裹的数据
if extract_wrapper_key and isinstance(data, dict):
if extract_wrapper_key in data:
data = data[extract_wrapper_key]
else:
logger.warning(
f"{context_name} 未找到包裹键'{extract_wrapper_key}',使用原始数据"
)
# 验证数据类型
if not isinstance(data, dict):
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict):
logger.warning(f"{context_name} 返回数组,自动提取第一个元素")
data = data[0]
else:
raise JSONParseError(
f"{context_name} 返回的不是JSON对象: {type(data).__name__}"
)
# 验证必需的键
if expected_keys:
missing_keys = [key for key in expected_keys if key not in data]
if missing_keys:
logger.warning(
f"{context_name} 缺少预期的键: {', '.join(missing_keys)}"
)
return data
__all__ = ["RobustJSONParser", "JSONParseError"]

View File

@@ -0,0 +1,235 @@
"""
测试RobustJSONParser的各种修复能力。
验证解析器能够处理:
1. 基本的markdown包裹
2. 思考内容清理
3. 缺少逗号的修复
4. 括号不平衡的修复
5. 控制字符转义
6. 尾随逗号移除
"""
import json
import unittest
from json_parser import RobustJSONParser, JSONParseError
class TestRobustJSONParser(unittest.TestCase):
"""测试鲁棒JSON解析器的各种修复策略。"""
def setUp(self):
"""初始化解析器。"""
self.parser = RobustJSONParser(
enable_json_repair=False, # 先测试本地修复
enable_llm_repair=False,
)
def test_basic_json(self):
"""测试解析基本的合法JSON。"""
json_str = '{"name": "test", "value": 123}'
result = self.parser.parse(json_str, "基本测试")
self.assertEqual(result["name"], "test")
self.assertEqual(result["value"], 123)
def test_markdown_wrapped(self):
"""测试解析被```json包裹的JSON。"""
json_str = """```json
{
"name": "test",
"value": 123
}
```"""
result = self.parser.parse(json_str, "Markdown包裹测试")
self.assertEqual(result["name"], "test")
self.assertEqual(result["value"], 123)
def test_thinking_content_removal(self):
"""测试清理思考内容。"""
json_str = """<thinking>让我想想如何构造这个JSON</thinking>
{
"name": "test",
"value": 123
}"""
result = self.parser.parse(json_str, "思考内容清理测试")
self.assertEqual(result["name"], "test")
self.assertEqual(result["value"], 123)
def test_missing_comma_fix(self):
"""测试修复缺少的逗号。"""
# 这是实际错误中常见的情况:数组元素之间缺少逗号
json_str = """{
"totalWords": 40000,
"globalGuidelines": [
"重点突出技术红利分配失衡"
"详略策略:技术创新"
],
"chapters": []
}"""
result = self.parser.parse(json_str, "缺少逗号修复测试")
self.assertEqual(len(result["globalGuidelines"]), 2)
def test_unbalanced_brackets(self):
"""测试修复括号不平衡。"""
# 缺少结束括号
json_str = """{
"name": "test",
"nested": {
"value": 123
}
""" # 缺少最外层的 }
result = self.parser.parse(json_str, "括号不平衡测试")
self.assertEqual(result["name"], "test")
self.assertEqual(result["nested"]["value"], 123)
def test_control_character_escape(self):
"""测试转义控制字符。"""
# JSON字符串中的裸换行符应该被转义
json_str = """{
"text": "这是第一行
这是第二行",
"value": 123
}"""
result = self.parser.parse(json_str, "控制字符转义测试")
# 确保换行符被正确处理
self.assertIn("第一行", result["text"])
self.assertIn("第二行", result["text"])
def test_trailing_comma_removal(self):
"""测试移除尾随逗号。"""
json_str = """{
"name": "test",
"value": 123,
"items": [1, 2, 3,],
}"""
result = self.parser.parse(json_str, "尾随逗号测试")
self.assertEqual(result["name"], "test")
self.assertEqual(len(result["items"]), 3)
def test_colon_equals_fix(self):
"""测试修复冒号等号错误。"""
json_str = """{
"name":= "test",
"value": 123
}"""
result = self.parser.parse(json_str, "冒号等号测试")
self.assertEqual(result["name"], "test")
def test_extract_first_json(self):
"""测试从文本中提取第一个JSON结构。"""
json_str = """这是一些说明文字下面是JSON
{
"name": "test",
"value": 123
}
后面还有一些其他文字"""
result = self.parser.parse(json_str, "提取JSON测试")
self.assertEqual(result["name"], "test")
self.assertEqual(result["value"], 123)
def test_complex_real_world_case(self):
"""测试真实世界的复杂案例(类似实际错误)。"""
# 模拟实际错误缺少逗号、有markdown包裹、有思考内容
json_str = """<thinking>我需要构造一个篇幅规划</thinking>
```json
{
"totalWords": 40000,
"tolerance": 2000,
"globalGuidelines": [
"重点突出技术红利分配失衡、人才流失与职业认同危机等结构性矛盾"
"详略策略:技术创新与传统技艺的碰撞"
"案例导向:优先引用真实数据和调研"
],
"chapters": [
{
"chapterId": "ch1",
"targetWords": 5000
}
]
}
```"""
result = self.parser.parse(json_str, "复杂真实案例测试")
self.assertEqual(result["totalWords"], 40000)
self.assertEqual(result["tolerance"], 2000)
self.assertEqual(len(result["globalGuidelines"]), 3)
self.assertEqual(len(result["chapters"]), 1)
def test_expected_keys_validation(self):
"""测试期望键的验证。"""
json_str = '{"name": "test"}'
# 不应该因为缺少键而失败,只是警告
result = self.parser.parse(
json_str, "键验证测试", expected_keys=["name", "value"]
)
self.assertIn("name", result)
def test_wrapper_key_extraction(self):
"""测试从包裹键中提取数据。"""
json_str = """{
"wrapper": {
"name": "test",
"value": 123
}
}"""
result = self.parser.parse(
json_str, "包裹键测试", extract_wrapper_key="wrapper"
)
self.assertEqual(result["name"], "test")
self.assertEqual(result["value"], 123)
def test_empty_input(self):
"""测试空输入。"""
with self.assertRaises(JSONParseError):
self.parser.parse("", "空输入测试")
def test_invalid_json_after_all_repairs(self):
"""测试所有修复策略都无法处理的情况。"""
# 这是一个严重损坏的JSON无法修复
json_str = "{完全不是JSON格式的内容###"
with self.assertRaises(JSONParseError):
self.parser.parse(json_str, "无法修复测试")
def run_manual_test():
"""手动运行测试,打印详细信息。"""
print("=" * 60)
print("开始测试RobustJSONParser")
print("=" * 60)
parser = RobustJSONParser(enable_json_repair=False, enable_llm_repair=False)
# 测试实际错误案例
test_case = """```json
{
"totalWords": 40000,
"tolerance": 2000,
"globalGuidelines": [
"重点突出技术红利分配失衡、人才流失与职业认同危机等结构性矛盾"
"详略策略:技术创新与传统技艺的碰撞"
],
"chapters": []
}
```"""
print("\n测试案例:")
print(test_case)
print("\n" + "=" * 60)
try:
result = parser.parse(test_case, "手动测试")
print("\n✓ 解析成功!")
print("\n解析结果:")
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as e:
print(f"\n✗ 解析失败: {e}")
print("\n" + "=" * 60)
if __name__ == "__main__":
# 运行手动测试
run_manual_test()
# 运行单元测试
print("\n\n运行单元测试...")
unittest.main(verbosity=2)