Merge branch 'HKUDS:main' into main

This commit is contained in:
Do Le Long An
2025-09-06 15:01:38 +07:00
committed by GitHub
5 changed files with 423 additions and 92 deletions

View File

@@ -1,7 +1,7 @@
from .raganything import RAGAnything as RAGAnything
from .config import RAGAnythingConfig as RAGAnythingConfig
__version__ = "1.2.7"
__version__ = "1.2.8"
__author__ = "Zirui Guo"
__url__ = "https://github.com/HKUDS/RAG-Anything"

12
raganything/base.py Normal file
View File

@@ -0,0 +1,12 @@
from enum import Enum
class DocStatus(str, Enum):
"""Document processing status"""
READY = "ready"
HANDLING = "handling"
PENDING = "pending"
PROCESSING = "processing"
PROCESSED = "processed"
FAILED = "failed"

View File

@@ -8,12 +8,15 @@ import os
import time
import hashlib
import json
from typing import Dict, List, Any, Tuple
from typing import Dict, List, Any, Tuple, Optional
from pathlib import Path
from raganything.base import DocStatus
from raganything.parser import MineruParser, DoclingParser
from raganything.utils import (
separate_content,
insert_text_content,
insert_text_content_with_multimodal_content,
get_processor_for_type,
)
import asyncio
@@ -325,7 +328,8 @@ class ProcessorMixin:
if ext in [".pdf"]:
self.logger.info("Detected PDF file, using parser for PDF...")
content_list = doc_parser.parse_pdf(
content_list = await asyncio.to_thread(
doc_parser.parse_pdf,
pdf_path=file_path,
output_dir=output_dir,
method=parse_method,
@@ -344,8 +348,11 @@ class ProcessorMixin:
self.logger.info("Detected image file, using parser for images...")
# Use the selected parser's image parsing capability
if hasattr(doc_parser, "parse_image"):
content_list = doc_parser.parse_image(
image_path=file_path, output_dir=output_dir, **kwargs
content_list = await asyncio.to_thread(
doc_parser.parse_image,
image_path=file_path,
output_dir=output_dir,
**kwargs,
)
else:
# Fallback to MinerU for image parsing if current parser doesn't support it
@@ -369,15 +376,19 @@ class ProcessorMixin:
self.logger.info(
"Detected Office or HTML document, using parser for Office/HTML..."
)
content_list = doc_parser.parse_office_doc(
doc_path=file_path, output_dir=output_dir, **kwargs
content_list = await asyncio.to_thread(
doc_parser.parse_office_doc,
doc_path=file_path,
output_dir=output_dir,
**kwargs,
)
else:
# For other or unknown formats, use generic parser
self.logger.info(
f"Using generic parser for {ext} file (method={parse_method})..."
)
content_list = doc_parser.parse_document(
content_list = await asyncio.to_thread(
doc_parser.parse_document,
file_path=file_path,
method=parse_method,
output_dir=output_dir,
@@ -398,7 +409,7 @@ class ProcessorMixin:
)
self.logger.info(
f"Parsing complete! Extracted {len(content_list)} content blocks"
f"Parsing {file_path} complete! Extracted {len(content_list)} content blocks"
)
# Generate doc_id based on content
@@ -429,7 +440,12 @@ class ProcessorMixin:
return content_list, doc_id
async def _process_multimodal_content(
self, multimodal_items: List[Dict[str, Any]], file_path: str, doc_id: str
self,
multimodal_items: List[Dict[str, Any]],
file_path: str,
doc_id: str,
pipeline_status: Optional[Any] = None,
pipeline_status_lock: Optional[Any] = None,
):
"""
Process multimodal content (using specialized processors)
@@ -438,7 +454,10 @@ class ProcessorMixin:
multimodal_items: List of multimodal items
file_path: File path (for reference)
doc_id: Document ID for proper chunk association
pipeline_status: Pipeline status object
pipeline_status_lock: Pipeline status lock
"""
if not multimodal_items:
self.logger.debug("No multimodal content to process")
return
@@ -477,9 +496,17 @@ class ProcessorMixin:
# Continue with processing if cache check fails
# Use ProcessorMixin's own batch processing that can handle multiple content types
self.logger.info("Starting multimodal content processing...")
log_message = "Starting multimodal content processing..."
self.logger.info(log_message)
if pipeline_status_lock and pipeline_status:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
try:
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
await self._process_multimodal_content_batch_type_aware(
multimodal_items=multimodal_items, file_path=file_path, doc_id=doc_id
)
@@ -487,7 +514,12 @@ class ProcessorMixin:
# Mark multimodal content as processed and update final status
await self._mark_multimodal_processing_complete(doc_id)
self.logger.info("Multimodal content processing complete")
log_message = "Multimodal content processing complete"
self.logger.info(log_message)
if pipeline_status_lock and pipeline_status:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
self.logger.error(f"Error in multimodal processing: {e}")
@@ -789,7 +821,7 @@ class ProcessorMixin:
# Stage 6: Use LightRAG's batch merge
await self._batch_merge_lightrag_style_type_aware(
enhanced_chunk_results, file_path
enhanced_chunk_results, file_path, doc_id
)
# Stage 7: Update doc_status with integrated chunks_list
@@ -1104,7 +1136,7 @@ class ProcessorMixin:
return enhanced_chunk_results
async def _batch_merge_lightrag_style_type_aware(
self, enhanced_chunk_results: List[Tuple], file_path: str
self, enhanced_chunk_results: List[Tuple], file_path: str, doc_id: str = None
):
"""Use LightRAG's merge_nodes_and_edges for batch merge"""
from lightrag.kg.shared_storage import (
@@ -1122,6 +1154,9 @@ class ProcessorMixin:
entity_vdb=self.lightrag.entities_vdb,
relationships_vdb=self.lightrag.relationships_vdb,
global_config=self.lightrag.__dict__,
full_entities_storage=self.lightrag.full_entities,
full_relations_storage=self.lightrag.full_relations,
doc_id=doc_id,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.lightrag.llm_response_cache,
@@ -1335,7 +1370,7 @@ class ProcessorMixin:
file_name = os.path.basename(file_path)
await insert_text_content(
self.lightrag,
text_content,
input=text_content,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
@@ -1355,6 +1390,199 @@ class ProcessorMixin:
self.logger.info(f"Document {file_path} processing complete!")
async def process_document_complete_lightrag_api(
self,
file_path: str,
output_dir: str = None,
parse_method: str = None,
display_stats: bool = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
doc_id: str | None = None,
scheme_name: str | None = None,
**kwargs,
):
"""
API exclusively for LightRAG calls: Complete document processing workflow
Args:
file_path: Path to the file to process
output_dir: output directory (defaults to config.parser_output_dir)
parse_method: Parse method (defaults to config.parse_method)
display_stats: Whether to display content statistics (defaults to config.display_content_stats)
split_by_character: Optional character to split the text by
split_by_character_only: If True, split only by the specified character
doc_id: Optional document ID, if not provided will be generated from content
**kwargs: Additional parameters for parser (e.g., lang, device, start_page, end_page, formula, table, backend, source)
"""
file_name = os.path.basename(file_path)
doc_pre_id = f"doc-pre-{file_name}"
pipeline_status = None
pipeline_status_lock = None
try:
# Ensure LightRAG is initialized
result = await self._ensure_lightrag_initialized()
if not result["success"]:
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
if not current_doc_status:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": result["error"],
}
}
)
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
return False
# Use config defaults if not provided
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if display_stats is None:
display_stats = self.config.display_content_stats
self.logger.info(f"Starting complete document processing: {file_path}")
# Initialize doc status
current_doc_status = await self.lightrag.doc_status.get_by_id(doc_pre_id)
if not current_doc_status:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
"status": DocStatus.READY,
"content": "",
"error_msg": "",
"content_summary": "",
"multimodal_content": [],
"scheme_name": scheme_name,
"content_length": 0,
"created_at": "",
"updated_at": "",
"file_path": file_name,
}
}
)
current_doc_status = await self.lightrag.doc_status.get_by_id(
doc_pre_id
)
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Set processing status
async with pipeline_status_lock:
pipeline_status.update({"scan_disabled": True})
pipeline_status["history_messages"].append("Now is not allowed to scan")
await self.lightrag.doc_status.upsert(
{doc_pre_id: {**current_doc_status, "status": DocStatus.HANDLING}}
)
# Step 1: Parse document
content_list, content_based_doc_id = await self.parse_document(
file_path, output_dir, parse_method, display_stats, **kwargs
)
# Use provided doc_id or fall back to content-based doc_id
if doc_id is None:
doc_id = content_based_doc_id
# Step 2: Separate text and multimodal content
text_content, multimodal_items = separate_content(content_list)
# Step 2.5: Set content source for context extraction in multimodal processing
if hasattr(self, "set_content_source_for_context") and multimodal_items:
self.logger.info(
"Setting content source for context-aware multimodal processing..."
)
self.set_content_source_for_context(
content_list, self.config.content_format
)
# Step 3: Insert pure text content and multimodal content with all parameters
if text_content.strip():
await insert_text_content_with_multimodal_content(
self.lightrag,
input=text_content,
multimodal_content=multimodal_items,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=doc_id,
scheme_name=scheme_name,
)
# Success: Update pipeline status
async with pipeline_status_lock:
pipeline_status.update({"scan_disabled": False})
pipeline_status["latest_message"] = (
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append(
f"RAGAnything processing completed for {file_name}"
)
pipeline_status["history_messages"].append("Now is allowed to scan")
self.logger.info(f"Document {file_path} processing completed successfully")
return True
except Exception as e:
self.logger.error(f"Error processing document {file_path}: {str(e)}")
self.logger.debug("Exception details:", exc_info=True)
# Update doc status to Failed
try:
await self.lightrag.doc_status.upsert(
{
doc_pre_id: {
**current_doc_status,
"status": DocStatus.FAILED,
"error_msg": str(e),
}
}
)
await self.lightrag.doc_status.index_done_callback()
self.logger.info(f"Updated doc_status to Failed for {doc_pre_id}")
except Exception as status_update_error:
self.logger.error(
f"Failed to update doc_status to Failed: {status_update_error}"
)
# Update pipeline status
if pipeline_status_lock and pipeline_status:
try:
async with pipeline_status_lock:
pipeline_status.update({"scan_disabled": False})
error_msg = (
f"RAGAnything processing failed for {file_name}: {str(e)}"
)
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
pipeline_status["history_messages"].append(
"Now is allowed to scan"
)
except Exception as pipeline_update_error:
self.logger.error(
f"Failed to update pipeline status: {pipeline_update_error}"
)
return False
async def insert_content_list(
self,
content_list: List[Dict[str, Any]],
@@ -1438,7 +1666,7 @@ class ProcessorMixin:
file_name = os.path.basename(file_path)
await insert_text_content(
self.lightrag,
text_content,
input=text_content,
file_paths=file_name,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,

View File

@@ -91,6 +91,9 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
parse_cache: Optional[Any] = field(default=None, init=False)
"""Parse result cache storage using LightRAG KV storage."""
_parser_installation_checked: bool = field(default=False, init=False)
"""Flag to track if parser installation has been checked."""
def __post_init__(self):
"""Post-initialization setup following LightRAG pattern"""
# Initialize configuration if not provided
@@ -219,36 +222,105 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
async def _ensure_lightrag_initialized(self):
"""Ensure LightRAG instance is initialized, create if necessary"""
try:
# Check parser installation first
if not self._parser_installation_checked:
if not self.doc_parser.check_installation():
error_msg = (
f"Parser '{self.config.parser}' is not properly installed. "
"Please install it using 'pip install' or 'uv pip install'."
)
self.logger.error(error_msg)
return {"success": False, "error": error_msg}
# Check parser installation first
if not self.doc_parser.check_installation():
raise RuntimeError(
f"Parser '{self.config.parser}' is not properly installed. "
"Please install it using pip install or uv pip install."
)
self._parser_installation_checked = True
self.logger.info(f"Parser '{self.config.parser}' installation verified")
if self.lightrag is not None:
# LightRAG was pre-provided, but we need to ensure it's properly initialized
# and that parse_cache is set up
if self.lightrag is not None:
# LightRAG was pre-provided, but we need to ensure it's properly initialized
try:
# Ensure LightRAG storages are initialized
if (
not hasattr(self.lightrag, "_storages_status")
or self.lightrag._storages_status.name != "INITIALIZED"
):
self.logger.info(
"Initializing storages for pre-provided LightRAG instance"
)
await self.lightrag.initialize_storages()
from lightrag.kg.shared_storage import (
initialize_pipeline_status,
)
# Ensure LightRAG storages are initialized
if (
not hasattr(self.lightrag, "_storages_status")
or self.lightrag._storages_status.name != "INITIALIZED"
):
self.logger.info(
"Initializing storages for pre-provided LightRAG instance"
)
await initialize_pipeline_status()
# Initialize parse cache if not already done
if self.parse_cache is None:
self.logger.info(
"Initializing parse cache for pre-provided LightRAG instance"
)
self.parse_cache = (
self.lightrag.key_string_value_json_storage_cls(
namespace="parse_cache",
workspace=self.lightrag.workspace,
global_config=self.lightrag.__dict__,
embedding_func=self.embedding_func,
)
)
await self.parse_cache.initialize()
# Initialize processors if not already done
if not self.modal_processors:
self._initialize_processors()
return {"success": True}
except Exception as e:
error_msg = (
f"Failed to initialize pre-provided LightRAG instance: {str(e)}"
)
self.logger.error(error_msg, exc_info=True)
return {"success": False, "error": error_msg}
# Validate required functions for creating new LightRAG instance
if self.llm_model_func is None:
error_msg = "llm_model_func must be provided when LightRAG is not pre-initialized"
self.logger.error(error_msg)
return {"success": False, "error": error_msg}
if self.embedding_func is None:
error_msg = "embedding_func must be provided when LightRAG is not pre-initialized"
self.logger.error(error_msg)
return {"success": False, "error": error_msg}
from lightrag.kg.shared_storage import initialize_pipeline_status
# Prepare LightRAG initialization parameters
lightrag_params = {
"working_dir": self.working_dir,
"llm_model_func": self.llm_model_func,
"embedding_func": self.embedding_func,
}
# Merge user-provided lightrag_kwargs, which can override defaults
lightrag_params.update(self.lightrag_kwargs)
# Log the parameters being used for initialization (excluding sensitive data)
log_params = {
k: v
for k, v in lightrag_params.items()
if not callable(v)
and k not in ["llm_model_kwargs", "vector_db_storage_cls_kwargs"]
}
self.logger.info(f"Initializing LightRAG with parameters: {log_params}")
try:
# Create LightRAG instance with merged parameters
self.lightrag = LightRAG(**lightrag_params)
await self.lightrag.initialize_storages()
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status()
# Initialize parse cache if not already done
if self.parse_cache is None:
self.logger.info(
"Initializing parse cache for pre-provided LightRAG instance"
)
# Initialize parse cache storage using LightRAG's KV storage
self.parse_cache = self.lightrag.key_string_value_json_storage_cls(
namespace="parse_cache",
workspace=self.lightrag.workspace,
@@ -257,62 +329,23 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
)
await self.parse_cache.initialize()
# Initialize processors if not already done
if not self.modal_processors:
# Initialize processors after LightRAG is ready
self._initialize_processors()
return
self.logger.info(
"LightRAG, parse cache, and multimodal processors initialized"
)
return {"success": True}
# Validate required functions for creating new LightRAG instance
if self.llm_model_func is None:
raise ValueError(
"llm_model_func must be provided when LightRAG is not pre-initialized"
)
if self.embedding_func is None:
raise ValueError(
"embedding_func must be provided when LightRAG is not pre-initialized"
)
except Exception as e:
error_msg = f"Failed to initialize LightRAG instance: {str(e)}"
self.logger.error(error_msg, exc_info=True)
return {"success": False, "error": error_msg}
from lightrag.kg.shared_storage import initialize_pipeline_status
# Prepare LightRAG initialization parameters
lightrag_params = {
"working_dir": self.working_dir,
"llm_model_func": self.llm_model_func,
"embedding_func": self.embedding_func,
}
# Merge user-provided lightrag_kwargs, which can override defaults
lightrag_params.update(self.lightrag_kwargs)
# Log the parameters being used for initialization (excluding sensitive data)
log_params = {
k: v
for k, v in lightrag_params.items()
if not callable(v)
and k not in ["llm_model_kwargs", "vector_db_storage_cls_kwargs"]
}
self.logger.info(f"Initializing LightRAG with parameters: {log_params}")
# Create LightRAG instance with merged parameters
self.lightrag = LightRAG(**lightrag_params)
await self.lightrag.initialize_storages()
await initialize_pipeline_status()
# Initialize parse cache storage using LightRAG's KV storage
self.parse_cache = self.lightrag.key_string_value_json_storage_cls(
namespace="parse_cache",
workspace=self.lightrag.workspace,
global_config=self.lightrag.__dict__,
embedding_func=self.embedding_func,
)
await self.parse_cache.initialize()
# Initialize processors after LightRAG is ready
self._initialize_processors()
self.logger.info("LightRAG, parse cache, and multimodal processors initialized")
except Exception as e:
error_msg = f"Unexpected error during LightRAG initialization: {str(e)}"
self.logger.error(error_msg, exc_info=True)
return {"success": False, "error": error_msg}
async def finalize_storages(self):
"""Finalize all storages including parse cache and LightRAG storages
@@ -369,6 +402,17 @@ class RAGAnything(QueryMixin, ProcessorMixin, BatchMixin):
"""
return self.doc_parser.check_installation()
def verify_parser_installation_once(self) -> bool:
if not self._parser_installation_checked:
if not self.doc_parser.check_installation():
raise RuntimeError(
f"Parser '{self.config.parser}' is not properly installed. "
"Please install it using pip install or uv pip install."
)
self._parser_installation_checked = True
self.logger.info(f"Parser '{self.config.parser}' installation verified")
return True
def get_config_info(self) -> Dict[str, Any]:
"""Get current configuration information"""
config_info = {

View File

@@ -174,6 +174,53 @@ async def insert_text_content(
logger.info("Text content insertion complete")
async def insert_text_content_with_multimodal_content(
lightrag,
input: str | list[str],
multimodal_content: list[dict[str, any]] | None = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
scheme_name: str | None = None,
):
"""
Insert pure text content into LightRAG
Args:
lightrag: LightRAG instance
input: Single document string or list of document strings
multimodal_content: Multimodal content list (optional)
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
scheme_name: scheme name (optional)
"""
logger.info("Starting text content insertion into LightRAG...")
# Use LightRAG's insert method with all parameters
try:
await lightrag.ainsert(
input=input,
multimodal_content=multimodal_content,
file_paths=file_paths,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
ids=ids,
scheme_name=scheme_name,
)
except Exception as e:
logger.info(f"Error: {e}")
logger.info(
"If the error is caused by the ainsert function not having a multimodal content parameter, please update the raganything branch of lightrag"
)
logger.info("Text content insertion complete")
def get_processor_for_type(modal_processors: Dict[str, Any], content_type: str):
"""
Get appropriate processor based on content type