This commit is contained in:
hzywhite
2025-09-05 14:56:35 +08:00
parent 6877983a71
commit 9872b86d13
2 changed files with 88 additions and 50 deletions

View File

@@ -32,6 +32,14 @@ from typing import (
T = TypeVar("T")
class MineruExecutionError(Exception):
"""catch mineru error"""
def __init__(self, return_code, error_msg):
self.return_code = return_code
self.error_msg = error_msg
super().__init__(f"Mineru command failed with return code {return_code}: {error_msg}")
class Parser:
"""
Base class for document parsing utilities.
@@ -607,6 +615,9 @@ class MineruParser(Parser):
if vlm_url:
cmd.extend(["-u", vlm_url])
output_lines = []
error_lines = []
try:
# Prepare subprocess parameters to hide console window on Windows
import platform
@@ -665,6 +676,7 @@ class MineruParser(Parser):
try:
while True:
prefix, line = stdout_queue.get_nowait()
output_lines.append(line)
# Log mineru output with INFO level, prefixed with [MinerU]
logging.info(f"[MinerU] {line}")
except Empty:
@@ -679,6 +691,8 @@ class MineruParser(Parser):
logging.warning(f"[MinerU] {line}")
elif "error" in line.lower():
logging.error(f"[MinerU] {line}")
error_message = line.split("\n")[0]
error_lines.append(error_message)
else:
logging.info(f"[MinerU] {line}")
except Empty:
@@ -693,6 +707,7 @@ class MineruParser(Parser):
try:
while True:
prefix, line = stdout_queue.get_nowait()
output_lines.append(line)
logging.info(f"[MinerU] {line}")
except Empty:
pass
@@ -704,6 +719,8 @@ class MineruParser(Parser):
logging.warning(f"[MinerU] {line}")
elif "error" in line.lower():
logging.error(f"[MinerU] {line}")
error_message = line.split("\n")[0]
error_lines.append(error_message)
else:
logging.info(f"[MinerU] {line}")
except Empty:
@@ -716,13 +733,16 @@ class MineruParser(Parser):
stdout_thread.join(timeout=5)
stderr_thread.join(timeout=5)
if return_code == 0:
logging.info("[MinerU] Command executed successfully")
if return_code != 0 or error_lines:
logging.info("[MinerU] Command executed failed")
raise MineruExecutionError(return_code, error_lines)
else:
raise subprocess.CalledProcessError(return_code, cmd)
logging.info("[MinerU] Command executed successfully")
except MineruExecutionError as e:
raise
except subprocess.CalledProcessError as e:
logging.error(f"Error running mineru command: {e}")
logging.error(f"Error running mineru subprocess command: {e}")
logging.error(f"Command: {' '.join(cmd)}")
logging.error(f"Return code: {e.returncode}")
raise
@@ -732,8 +752,9 @@ class MineruParser(Parser):
"pip install -U 'mineru[core]' or uv pip install -U 'mineru[core]'"
)
except Exception as e:
logging.error(f"Unexpected error running mineru command: {e}")
raise
error_message = f"Unexpected error running mineru command: {e}"
logging.error(error_message)
raise RuntimeError(error_message) from e
@staticmethod
def _read_output_files(
@@ -858,6 +879,8 @@ class MineruParser(Parser):
)
return content_list
except MineruExecutionError as e:
raise
except Exception as e:
logging.error(f"Error in parse_pdf: {str(e)}")
raise
@@ -996,6 +1019,9 @@ class MineruParser(Parser):
)
return content_list
except MineruExecutionError as e:
raise
finally:
# Clean up temporary converted file if it was created
if temp_converted_file and temp_converted_file.exists():

View File

@@ -12,7 +12,7 @@ from typing import Dict, List, Any, Tuple, Optional
from pathlib import Path
from raganything.base import DocStatus
from raganything.parser import MineruParser, DoclingParser
from raganything.parser import MineruParser, DoclingParser, MineruExecutionError
from raganything.utils import (
separate_content,
insert_text_content,
@@ -395,18 +395,14 @@ class ProcessorMixin:
**kwargs,
)
except MineruExecutionError as e:
self.logger.error(f"Mineru command failed: {e}")
raise
except Exception as e:
self.logger.error(
f"Error during parsing with {self.config.parser} parser: {str(e)}"
)
self.logger.warning("Falling back to MinerU parser...")
# If specific parser fails, fall back to MinerU parser
content_list = MineruParser().parse_document(
file_path=file_path,
method=parse_method,
output_dir=output_dir,
**kwargs,
)
raise e
self.logger.info(
f"Parsing {file_path} complete! Extracted {len(content_list)} content blocks"
@@ -668,6 +664,9 @@ class ProcessorMixin:
entity_vdb=self.lightrag.entities_vdb,
relationships_vdb=self.lightrag.relationships_vdb,
global_config=self.lightrag.__dict__,
full_entities_storage=self.lightrag.full_entities,
full_relations_storage=self.lightrag.full_relations,
doc_id=doc_id,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.lightrag.llm_response_cache,
@@ -1400,6 +1399,7 @@ class ProcessorMixin:
split_by_character_only: bool = False,
doc_id: str | None = None,
scheme_name: str | None = None,
parser: str | None = None,
**kwargs,
):
"""
@@ -1420,27 +1420,26 @@ class ProcessorMixin:
pipeline_status = None
pipeline_status_lock = None
if parser:
self.config.parser = parser
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
try:
# Ensure LightRAG is initialized
result = await self._ensure_lightrag_initialized()
if not result["success"]:
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
if not current_doc_status:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": result["error"],
}
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": result["error"],
}
)
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
}
)
return False
# Use config defaults if not provided
@@ -1490,13 +1489,32 @@ class ProcessorMixin:
pipeline_status["history_messages"].append("Now is not allowed to scan")
await self.lightrag.doc_status.upsert(
{doc_pre_id: {**current_doc_status, "status": DocStatus.HANDLING}}
{doc_pre_id: {**current_doc_status, "status": DocStatus.HANDLING, "error_msg": ""}}
)
content_list = []
content_based_doc_id = ''
try:
# Step 1: Parse document
content_list, content_based_doc_id = await self.parse_document(
file_path, output_dir, parse_method, display_stats, **kwargs
)
content_list, content_based_doc_id = await self.parse_document(
file_path, output_dir, parse_method, display_stats, **kwargs
)
except MineruExecutionError as e:
error_message = e.error_msg
if isinstance(e.error_msg, list):
error_message = "\n".join(e.error_msg)
await self.lightrag.doc_status.upsert(
{doc_pre_id: {**current_doc_status, "status": DocStatus.FAILED, "error_msg": error_message}}
)
self.logger.info(f"Error processing document {file_path}: MineruExecutionError")
return False
except Exception as e:
await self.lightrag.doc_status.upsert(
{doc_pre_id: {**current_doc_status, "status": DocStatus.FAILED, "error_msg": str(e)}}
)
self.logger.info(f"Error processing document {file_path}: {str(e)}")
return False
# Use provided doc_id or fall back to content-based doc_id
if doc_id is None:
@@ -1546,22 +1564,16 @@ class ProcessorMixin:
self.logger.debug("Exception details:", exc_info=True)
# Update doc status to Failed
try:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.info(f"Updated doc_status to Failed for {doc_pre_id}")
except Exception as status_update_error:
self.logger.error(
f"Failed to update doc_status to Failed: {status_update_error}"
)
}
)
await self.lightrag.doc_status.index_done_callback()
# Update pipeline status
if pipeline_status_lock and pipeline_status: