corrected the linting errors

This commit is contained in:
MinalMahalaShorthillsAI
2025-07-24 15:07:33 +05:30
parent 5b44298214
commit 1764e1ee8d
4 changed files with 153 additions and 263 deletions

View File

@@ -332,4 +332,3 @@ def main():
if __name__ == "__main__":
main()

View File

@@ -18,14 +18,6 @@ if TYPE_CHECKING:
class BatchMixin:
"""BatchMixin class containing batch processing functionality for RAGAnything"""
# Type hints for mixin attributes (will be available when mixed into RAGAnything)
config: "RAGAnythingConfig"
logger: logging.Logger
# Type hints for methods that will be available from other mixins
async def _ensure_lightrag_initialized(self) -> None: ...
async def process_document_complete(self, file_path: str, **kwargs) -> None: ...
# Type hints for mixin attributes (will be available when mixed into RAGAnything)
config: "RAGAnythingConfig"
@@ -41,227 +33,120 @@ class BatchMixin:
async def process_folder_complete(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> BatchProcessingResult:
folder_path: str,
output_dir: str = None,
parse_method: str = None,
display_stats: bool = None,
split_by_character: str | None = None,
split_by_character_only: bool = False,
file_extensions: Optional[List[str]] = None,
recursive: bool = None,
max_workers: int = None,
):
"""
Process multiple documents in batch mode
Process all supported files in a folder
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
Returns:
BatchProcessingResult with processing statistics
folder_path: Path to the folder containing files to process
output_dir: Directory for parsed outputs (optional)
parse_method: Parsing method to use (optional)
display_stats: Whether to display statistics (optional)
split_by_character: Character to split by (optional)
split_by_character_only: Whether to split only by character (optional)
file_extensions: List of file extensions to process (optional)
recursive: Whether to process folders recursively (optional)
max_workers: Maximum number of workers for concurrent processing (optional)
"""
# Use config defaults if not provided
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.mineru_parse_method
parse_method = self.config.parse_method
if display_stats is None:
display_stats = True
if file_extensions is None:
file_extensions = self.config.supported_file_extensions
if recursive is None:
recursive = self.config.recursive_folder_processing
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info(f"Starting batch processing of {len(file_paths)} paths")
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
timeout_per_file=300, # 5 minutes per file
)
# Process files
result = batch_parser.process_batch(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
recursive=recursive,
**kwargs,
)
self.logger.info(f"Batch processing completed: {result.summary()}")
return result
async def process_documents_batch_async(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> BatchProcessingResult:
"""
Async version of batch document processing
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
Returns:
BatchProcessingResult with processing statistics
"""
# Use config defaults if not provided
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.mineru_parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info(f"Starting async batch processing of {len(file_paths)} paths")
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
timeout_per_file=300, # 5 minutes per file
)
# Process files asynchronously
result = await batch_parser.process_batch_async(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
recursive=recursive,
**kwargs,
)
self.logger.info(f"Async batch processing completed: {result.summary()}")
return result
def get_supported_file_extensions(self) -> List[str]:
"""Get list of supported file extensions for batch processing"""
batch_parser = BatchParser(parser_type=self.config.parser)
return batch_parser.get_supported_extensions()
def filter_supported_files(
self,
file_paths: List[str],
recursive: Optional[bool] = None
) -> List[str]:
"""
Filter file paths to only include supported file types
Args:
file_paths: List of file paths or directories
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
Returns:
List of supported file paths
"""
if recursive is None:
recursive = self.config.recursive_folder_processing
batch_parser = BatchParser(parser_type=self.config.parser)
return batch_parser.filter_supported_files(file_paths, recursive)
async def process_documents_with_rag_batch(
self,
file_paths: List[str],
output_dir: Optional[str] = None,
parse_method: Optional[str] = None,
max_workers: Optional[int] = None,
recursive: Optional[bool] = None,
show_progress: bool = True,
**kwargs,
) -> Dict[str, Any]:
"""
Process multiple documents and insert them into RAG system
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
Returns:
Dictionary with processing results and RAG statistics
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
start_time = time.time()
# Get all files in the folder
folder_path_obj = Path(folder_path)
if not folder_path_obj.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
# First, parse all documents
parse_result = self.process_documents_batch(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
max_workers=max_workers,
recursive=recursive,
show_progress=show_progress,
**kwargs,
# Collect files based on supported extensions
files_to_process = []
for file_ext in file_extensions:
if recursive:
pattern = f"**/*{file_ext}"
else:
pattern = f"*{file_ext}"
files_to_process.extend(folder_path_obj.glob(pattern))
if not files_to_process:
self.logger.warning(f"No supported files found in {folder_path}")
return
self.logger.info(
f"Found {len(files_to_process)} files to process in {folder_path}"
)
# Then, process each successful file with RAG
rag_results = {}
total_content_blocks = 0
# Create output directory if it doesn't exist
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
if parse_result.successful_files:
self.logger.info(f"Processing {len(parse_result.successful_files)} files with RAG")
# Process files with controlled concurrency
semaphore = asyncio.Semaphore(max_workers)
tasks = []
# Process files with RAG (this could be parallelized in the future)
for file_path in parse_result.successful_files:
async def process_single_file(file_path: Path):
async with semaphore:
try:
# Process the document with RAG
await self.process_document_complete(
file_path=file_path,
str(file_path),
output_dir=output_dir,
parse_method=parse_method,
**kwargs,
split_by_character=split_by_character,
split_by_character_only=split_by_character_only,
)
# Get some statistics about the processed content
# This would require additional tracking in the RAG system
rag_results[file_path] = {
"status": "success",
"processed": True
}
return True, str(file_path), None
except Exception as e:
self.logger.error(f"Failed to process {file_path} with RAG: {str(e)}")
rag_results[file_path] = {
"status": "failed",
"error": str(e),
"processed": False
}
self.logger.error(f"Failed to process {file_path}: {str(e)}")
return False, str(file_path), str(e)
processing_time = time.time() - start_time
# Create tasks for all files
for file_path in files_to_process:
task = asyncio.create_task(process_single_file(file_path))
tasks.append(task)
return {
"parse_result": parse_result,
"rag_results": rag_results,
"total_processing_time": processing_time,
"successful_rag_files": len([r for r in rag_results.values() if r["processed"]]),
"failed_rag_files": len([r for r in rag_results.values() if not r["processed"]]),
}
# Wait for all tasks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_files = []
failed_files = []
for result in results:
if isinstance(result, Exception):
failed_files.append(("unknown", str(result)))
else:
success, file_path, error = result
if success:
successful_files.append(file_path)
else:
failed_files.append((file_path, error))
# Display statistics if requested
if display_stats:
self.logger.info("Processing complete!")
self.logger.info(f" Successful: {len(successful_files)} files")
self.logger.info(f" Failed: {len(failed_files)} files")
if failed_files:
self.logger.warning("Failed files:")
for file_path, error in failed_files:
self.logger.warning(f" - {file_path}: {error}")
# ==========================================
# NEW ENHANCED BATCH PROCESSING METHODS
@@ -278,42 +163,40 @@ class BatchMixin:
**kwargs,
) -> BatchProcessingResult:
"""
Process multiple documents in batch mode
Process multiple documents in batch using the new BatchParser
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
BatchProcessingResult with processing statistics
BatchProcessingResult: Results of the batch processing
"""
# Use config defaults if not provided
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.mineru_parse_method
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info(f"Starting batch processing of {len(file_paths)} paths")
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
timeout_per_file=300, # 5 minutes per file
skip_installation_check=True, # Skip installation check for better UX
)
# Process files
result = batch_parser.process_batch(
# Process batch
return batch_parser.process_batch(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
@@ -321,9 +204,6 @@ class BatchMixin:
**kwargs,
)
self.logger.info(f"Batch processing completed: {result.summary()}")
return result
async def process_documents_batch_async(
self,
file_paths: List[str],
@@ -335,42 +215,40 @@ class BatchMixin:
**kwargs,
) -> BatchProcessingResult:
"""
Async version of batch document processing
Asynchronously process multiple documents in batch
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
BatchProcessingResult with processing statistics
BatchProcessingResult: Results of the batch processing
"""
# Use config defaults if not provided
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.mineru_parse_method
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info(f"Starting async batch processing of {len(file_paths)} paths")
# Create batch parser
batch_parser = BatchParser(
parser_type=self.config.parser,
max_workers=max_workers,
show_progress=show_progress,
timeout_per_file=300, # 5 minutes per file
skip_installation_check=True, # Skip installation check for better UX
)
# Process files asynchronously
result = await batch_parser.process_batch_async(
# Process batch asynchronously
return await batch_parser.process_batch_async(
file_paths=file_paths,
output_dir=output_dir,
parse_method=parse_method,
@@ -378,9 +256,6 @@ class BatchMixin:
**kwargs,
)
self.logger.info(f"Async batch processing completed: {result.summary()}")
return result
def get_supported_file_extensions(self) -> List[str]:
"""Get list of supported file extensions for batch processing"""
batch_parser = BatchParser(parser_type=self.config.parser)
@@ -393,8 +268,8 @@ class BatchMixin:
Filter file paths to only include supported file types
Args:
file_paths: List of file paths or directories
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
file_paths: List of file paths to filter
recursive: Whether to process directories recursively
Returns:
List of supported file paths
@@ -416,26 +291,39 @@ class BatchMixin:
**kwargs,
) -> Dict[str, Any]:
"""
Process multiple documents and insert them into RAG system
Process documents in batch and then add them to RAG
This method combines document parsing and RAG insertion:
1. First, parse all documents using batch processing
2. Then, process each successfully parsed document with RAG
Args:
file_paths: List of file paths or directories to process
output_dir: Output directory (defaults to config.output_dir)
parse_method: Parse method (defaults to config.mineru_parse_method)
max_workers: Maximum number of parallel workers (defaults to config.max_concurrent_files)
recursive: Whether to search directories recursively (defaults to config.recursive_folder_processing)
show_progress: Whether to show progress bars
**kwargs: Additional parameters for parser
output_dir: Output directory for parsed files
parse_method: Parsing method to use
max_workers: Maximum number of workers for parallel processing
recursive: Whether to process directories recursively
show_progress: Whether to show progress bar
**kwargs: Additional arguments passed to the parser
Returns:
Dictionary with processing results and RAG statistics
Dict containing both parse results and RAG processing results
"""
# Ensure LightRAG is initialized
await self._ensure_lightrag_initialized()
start_time = time.time()
# First, parse all documents
# Use config defaults if not specified
if output_dir is None:
output_dir = self.config.parser_output_dir
if parse_method is None:
parse_method = self.config.parse_method
if max_workers is None:
max_workers = self.config.max_concurrent_files
if recursive is None:
recursive = self.config.recursive_folder_processing
self.logger.info("Starting batch processing with RAG integration")
# Step 1: Parse documents in batch
parse_result = self.process_documents_batch(
file_paths=file_paths,
output_dir=output_dir,
@@ -446,6 +334,10 @@ class BatchMixin:
**kwargs,
)
# Step 2: Process with RAG
# Initialize RAG system
await self._ensure_lightrag_initialized()
# Then, process each successful file with RAG
rag_results = {}
@@ -457,9 +349,9 @@ class BatchMixin:
# Process files with RAG (this could be parallelized in the future)
for file_path in parse_result.successful_files:
try:
# Process the document with RAG
# Process the successfully parsed file with RAG
await self.process_document_complete(
file_path=file_path,
file_path,
output_dir=output_dir,
parse_method=parse_method,
**kwargs,

View File

@@ -20,12 +20,14 @@ import subprocess
try:
import markdown
MARKDOWN_AVAILABLE = True
except ImportError:
MARKDOWN_AVAILABLE = False
try:
from weasyprint import HTML
WEASYPRINT_AVAILABLE = True
except ImportError:
WEASYPRINT_AVAILABLE = False
@@ -33,6 +35,7 @@ except ImportError:
try:
# Check if pandoc module exists (not used directly, just for detection)
import importlib.util
spec = importlib.util.find_spec("pandoc")
PANDOC_AVAILABLE = spec is not None
except ImportError:

View File

@@ -5,21 +5,17 @@ lightrag-hku
# Enhanced markdown conversion (optional)
markdown
# Enhanced markdown conversion (optional)
# MinerU 2.0 packages (replaces magic-pdf)
mineru[core]
pygments
# Progress bars for batch processing
tqdm
weasyprint
# Progress bars for batch processing
tqdm
# Enhanced markdown conversion (optional)
markdown
weasyprint
pygments
# Note: Optional dependencies are now defined in setup.py extras_require:
# - [image]: Pillow>=10.0.0 (for BMP, TIFF, GIF, WebP format conversion)