Merge pull request #113 from HKUDS/ui

Add RAGAnything processing to LightRAG's webui
This commit is contained in:
zrguo
2025-09-16 10:20:56 +08:00
committed by GitHub
2 changed files with 198 additions and 69 deletions

View File

@@ -32,6 +32,17 @@ from typing import (
T = TypeVar("T")
class MineruExecutionError(Exception):
"""catch mineru error"""
def __init__(self, return_code, error_msg):
self.return_code = return_code
self.error_msg = error_msg
super().__init__(
f"Mineru command failed with return code {return_code}: {error_msg}"
)
class Parser:
"""
Base class for document parsing utilities.
@@ -607,6 +618,9 @@ class MineruParser(Parser):
if vlm_url:
cmd.extend(["-u", vlm_url])
output_lines = []
error_lines = []
try:
# Prepare subprocess parameters to hide console window on Windows
import platform
@@ -665,6 +679,7 @@ class MineruParser(Parser):
try:
while True:
prefix, line = stdout_queue.get_nowait()
output_lines.append(line)
# Log mineru output with INFO level, prefixed with [MinerU]
logging.info(f"[MinerU] {line}")
except Empty:
@@ -679,6 +694,8 @@ class MineruParser(Parser):
logging.warning(f"[MinerU] {line}")
elif "error" in line.lower():
logging.error(f"[MinerU] {line}")
error_message = line.split("\n")[0]
error_lines.append(error_message)
else:
logging.info(f"[MinerU] {line}")
except Empty:
@@ -693,6 +710,7 @@ class MineruParser(Parser):
try:
while True:
prefix, line = stdout_queue.get_nowait()
output_lines.append(line)
logging.info(f"[MinerU] {line}")
except Empty:
pass
@@ -704,6 +722,8 @@ class MineruParser(Parser):
logging.warning(f"[MinerU] {line}")
elif "error" in line.lower():
logging.error(f"[MinerU] {line}")
error_message = line.split("\n")[0]
error_lines.append(error_message)
else:
logging.info(f"[MinerU] {line}")
except Empty:
@@ -716,13 +736,16 @@ class MineruParser(Parser):
stdout_thread.join(timeout=5)
stderr_thread.join(timeout=5)
if return_code == 0:
logging.info("[MinerU] Command executed successfully")
if return_code != 0 or error_lines:
logging.info("[MinerU] Command executed failed")
raise MineruExecutionError(return_code, error_lines)
else:
raise subprocess.CalledProcessError(return_code, cmd)
logging.info("[MinerU] Command executed successfully")
except MineruExecutionError:
raise
except subprocess.CalledProcessError as e:
logging.error(f"Error running mineru command: {e}")
logging.error(f"Error running mineru subprocess command: {e}")
logging.error(f"Command: {' '.join(cmd)}")
logging.error(f"Return code: {e.returncode}")
raise
@@ -732,8 +755,9 @@ class MineruParser(Parser):
"pip install -U 'mineru[core]' or uv pip install -U 'mineru[core]'"
)
except Exception as e:
logging.error(f"Unexpected error running mineru command: {e}")
raise
error_message = f"Unexpected error running mineru command: {e}"
logging.error(error_message)
raise RuntimeError(error_message) from e
@staticmethod
def _read_output_files(
@@ -858,6 +882,8 @@ class MineruParser(Parser):
)
return content_list
except MineruExecutionError:
raise
except Exception as e:
logging.error(f"Error in parse_pdf: {str(e)}")
raise
@@ -996,6 +1022,9 @@ class MineruParser(Parser):
)
return content_list
except MineruExecutionError:
raise
finally:
# Clean up temporary converted file if it was created
if temp_converted_file and temp_converted_file.exists():

View File

@@ -12,7 +12,7 @@ from typing import Dict, List, Any, Tuple, Optional
from pathlib import Path
from raganything.base import DocStatus
from raganything.parser import MineruParser, DoclingParser
from raganything.parser import MineruParser, DoclingParser, MineruExecutionError
from raganything.utils import (
separate_content,
insert_text_content,
@@ -395,22 +395,20 @@ class ProcessorMixin:
**kwargs,
)
except MineruExecutionError as e:
self.logger.error(f"Mineru command failed: {e}")
raise
except Exception as e:
self.logger.error(
f"Error during parsing with {self.config.parser} parser: {str(e)}"
)
self.logger.warning("Falling back to MinerU parser...")
# If specific parser fails, fall back to MinerU parser
content_list = MineruParser().parse_document(
file_path=file_path,
method=parse_method,
output_dir=output_dir,
**kwargs,
)
raise e
self.logger.info(
f"Parsing {file_path} complete! Extracted {len(content_list)} content blocks"
)
msg = f"Parsing {file_path} complete! Extracted {len(content_list)} content blocks"
self.logger.info(msg)
if len(content_list) == 0:
raise ValueError("Parsing failed: No content was extracted")
# Generate doc_id based on content
doc_id = self._generate_content_based_doc_id(content_list)
@@ -668,6 +666,9 @@ class ProcessorMixin:
entity_vdb=self.lightrag.entities_vdb,
relationships_vdb=self.lightrag.relationships_vdb,
global_config=self.lightrag.__dict__,
full_entities_storage=self.lightrag.full_entities,
full_relations_storage=self.lightrag.full_relations,
doc_id=doc_id,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.lightrag.llm_response_cache,
@@ -801,9 +802,9 @@ class ProcessorMixin:
# Stage 3: Store chunks to LightRAG storage
await self._store_chunks_to_lightrag_storage_type_aware(lightrag_chunks)
# Stage 3.5: Store multimodal main entities to entities_vdb
# Stage 3.5: Store multimodal main entities to entities_vdb and full_entities
await self._store_multimodal_main_entities(
multimodal_data_list, lightrag_chunks, file_path
multimodal_data_list, lightrag_chunks, file_path, doc_id
)
# Track chunk IDs for doc_status update
@@ -965,14 +966,17 @@ class ProcessorMixin:
multimodal_data_list: List[Dict[str, Any]],
lightrag_chunks: Dict[str, Any],
file_path: str,
doc_id: str = None,
):
"""
Store multimodal main entities to entities_vdb.
Store multimodal main entities to entities_vdb and full_entities.
This ensures that entities like "TableName (table)" are properly indexed.
Args:
multimodal_data_list: List of processed multimodal data with entity info
lightrag_chunks: Chunks in LightRAG format (already formatted with templates)
file_path: File path for the entities
doc_id: Document ID for full_entities storage
"""
if not multimodal_data_list:
return
@@ -1034,14 +1038,79 @@ class ProcessorMixin:
await self.lightrag.entities_vdb.upsert(entities_to_store)
await self.lightrag.entities_vdb.index_done_callback()
# NEW: Store multimodal main entities in full_entities storage
if doc_id and self.lightrag.full_entities:
await self._store_multimodal_entities_to_full_entities(
entities_to_store, doc_id
)
self.logger.debug(
f"Stored {len(entities_to_store)} multimodal main entities to knowledge graph and entities_vdb"
f"Stored {len(entities_to_store)} multimodal main entities to knowledge graph, entities_vdb, and full_entities"
)
except Exception as e:
self.logger.error(f"Error storing multimodal main entities: {e}")
raise
async def _store_multimodal_entities_to_full_entities(
self, entities_to_store: Dict[str, Any], doc_id: str
):
"""
Store multimodal main entities to full_entities storage.
Args:
entities_to_store: Dictionary of entities to store
doc_id: Document ID for grouping entities
"""
try:
# Get current full_entities data for this document
current_doc_entities = await self.lightrag.full_entities.get_by_id(doc_id)
if current_doc_entities is None:
# Create new document entry
entity_names = list(
entity_data["entity_name"]
for entity_data in entities_to_store.values()
)
doc_entities_data = {
"entity_names": entity_names,
"count": len(entity_names),
"update_time": int(time.time()),
}
else:
# Update existing document entry
existing_entity_names = set(
current_doc_entities.get("entity_names", [])
)
new_entity_names = [
entity_data["entity_name"]
for entity_data in entities_to_store.values()
]
# Add new multimodal entities to the list (avoid duplicates)
for entity_name in new_entity_names:
existing_entity_names.add(entity_name)
doc_entities_data = {
"entity_names": list(existing_entity_names),
"count": len(existing_entity_names),
"update_time": int(time.time()),
}
# Store updated data
await self.lightrag.full_entities.upsert({doc_id: doc_entities_data})
await self.lightrag.full_entities.index_done_callback()
self.logger.debug(
f"Added {len(entities_to_store)} multimodal main entities to full_entities for doc {doc_id}"
)
except Exception as e:
self.logger.error(
f"Error storing multimodal entities to full_entities: {e}"
)
raise
async def _batch_extract_entities_lightrag_style_type_aware(
self, lightrag_chunks: Dict[str, Any]
) -> List[Tuple]:
@@ -1400,6 +1469,7 @@ class ProcessorMixin:
split_by_character_only: bool = False,
doc_id: str | None = None,
scheme_name: str | None = None,
parser: str | None = None,
**kwargs,
):
"""
@@ -1420,27 +1490,24 @@ class ProcessorMixin:
pipeline_status = None
pipeline_status_lock = None
if parser:
self.config.parser = parser
current_doc_status = await self.lightrag.doc_status.get_by_id(doc_pre_id)
try:
# Ensure LightRAG is initialized
result = await self._ensure_lightrag_initialized()
if not result["success"]:
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
if not current_doc_status:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": result["error"],
}
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": result["error"],
}
)
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
}
)
return False
# Use config defaults if not provided
@@ -1490,13 +1557,52 @@ class ProcessorMixin:
pipeline_status["history_messages"].append("Now is not allowed to scan")
await self.lightrag.doc_status.upsert(
{doc_pre_id: {**current_doc_status, "status": DocStatus.HANDLING}}
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.HANDLING,
"error_msg": "",
}
}
)
# Step 1: Parse document
content_list, content_based_doc_id = await self.parse_document(
file_path, output_dir, parse_method, display_stats, **kwargs
)
content_list = []
content_based_doc_id = ""
try:
# Step 1: Parse document
content_list, content_based_doc_id = await self.parse_document(
file_path, output_dir, parse_method, display_stats, **kwargs
)
except MineruExecutionError as e:
error_message = e.error_msg
if isinstance(e.error_msg, list):
error_message = "\n".join(e.error_msg)
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": error_message,
}
}
)
self.logger.info(
f"Error processing document {file_path}: MineruExecutionError"
)
return False
except Exception as e:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
}
)
self.logger.info(f"Error processing document {file_path}: {str(e)}")
return False
# Use provided doc_id or fall back to content-based doc_id
if doc_id is None:
@@ -1527,17 +1633,6 @@ class ProcessorMixin:
scheme_name=scheme_name,
)
# Success: Update pipeline status
async with pipeline_status_lock:
pipeline_status.update({"scan_disabled": False})
pipeline_status["latest_message"] = (
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append(
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append("Now is allowed to scan")
self.logger.info(f"Document {file_path} processing completed successfully")
return True
@@ -1546,22 +1641,16 @@ class ProcessorMixin:
self.logger.debug("Exception details:", exc_info=True)
# Update doc status to Failed
try:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.info(f"Updated doc_status to Failed for {doc_pre_id}")
except Exception as status_update_error:
self.logger.error(
f"Failed to update doc_status to Failed: {status_update_error}"
)
}
)
await self.lightrag.doc_status.index_done_callback()
# Update pipeline status
if pipeline_status_lock and pipeline_status:
@@ -1583,6 +1672,17 @@ class ProcessorMixin:
return False
finally:
async with pipeline_status_lock:
pipeline_status.update({"scan_disabled": False})
pipeline_status["latest_message"] = (
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append(
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append("Now is allowed to scan")
async def insert_content_list(
self,
content_list: List[Dict[str, Any]],