Update processor.py
This commit is contained in:
@@ -802,9 +802,9 @@ class ProcessorMixin:
|
|||||||
# Stage 3: Store chunks to LightRAG storage
|
# Stage 3: Store chunks to LightRAG storage
|
||||||
await self._store_chunks_to_lightrag_storage_type_aware(lightrag_chunks)
|
await self._store_chunks_to_lightrag_storage_type_aware(lightrag_chunks)
|
||||||
|
|
||||||
# Stage 3.5: Store multimodal main entities to entities_vdb
|
# Stage 3.5: Store multimodal main entities to entities_vdb and full_entities
|
||||||
await self._store_multimodal_main_entities(
|
await self._store_multimodal_main_entities(
|
||||||
multimodal_data_list, lightrag_chunks, file_path
|
multimodal_data_list, lightrag_chunks, file_path, doc_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# Track chunk IDs for doc_status update
|
# Track chunk IDs for doc_status update
|
||||||
@@ -966,14 +966,17 @@ class ProcessorMixin:
|
|||||||
multimodal_data_list: List[Dict[str, Any]],
|
multimodal_data_list: List[Dict[str, Any]],
|
||||||
lightrag_chunks: Dict[str, Any],
|
lightrag_chunks: Dict[str, Any],
|
||||||
file_path: str,
|
file_path: str,
|
||||||
|
doc_id: str = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Store multimodal main entities to entities_vdb.
|
Store multimodal main entities to entities_vdb and full_entities.
|
||||||
This ensures that entities like "TableName (table)" are properly indexed.
|
This ensures that entities like "TableName (table)" are properly indexed.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
multimodal_data_list: List of processed multimodal data with entity info
|
multimodal_data_list: List of processed multimodal data with entity info
|
||||||
lightrag_chunks: Chunks in LightRAG format (already formatted with templates)
|
lightrag_chunks: Chunks in LightRAG format (already formatted with templates)
|
||||||
|
file_path: File path for the entities
|
||||||
|
doc_id: Document ID for full_entities storage
|
||||||
"""
|
"""
|
||||||
if not multimodal_data_list:
|
if not multimodal_data_list:
|
||||||
return
|
return
|
||||||
@@ -1035,14 +1038,79 @@ class ProcessorMixin:
|
|||||||
await self.lightrag.entities_vdb.upsert(entities_to_store)
|
await self.lightrag.entities_vdb.upsert(entities_to_store)
|
||||||
await self.lightrag.entities_vdb.index_done_callback()
|
await self.lightrag.entities_vdb.index_done_callback()
|
||||||
|
|
||||||
|
# NEW: Store multimodal main entities in full_entities storage
|
||||||
|
if doc_id and self.lightrag.full_entities:
|
||||||
|
await self._store_multimodal_entities_to_full_entities(
|
||||||
|
entities_to_store, doc_id
|
||||||
|
)
|
||||||
|
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
f"Stored {len(entities_to_store)} multimodal main entities to knowledge graph and entities_vdb"
|
f"Stored {len(entities_to_store)} multimodal main entities to knowledge graph, entities_vdb, and full_entities"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error storing multimodal main entities: {e}")
|
self.logger.error(f"Error storing multimodal main entities: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
async def _store_multimodal_entities_to_full_entities(
|
||||||
|
self, entities_to_store: Dict[str, Any], doc_id: str
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Store multimodal main entities to full_entities storage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entities_to_store: Dictionary of entities to store
|
||||||
|
doc_id: Document ID for grouping entities
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Get current full_entities data for this document
|
||||||
|
current_doc_entities = await self.lightrag.full_entities.get_by_id(doc_id)
|
||||||
|
|
||||||
|
if current_doc_entities is None:
|
||||||
|
# Create new document entry
|
||||||
|
entity_names = list(
|
||||||
|
entity_data["entity_name"]
|
||||||
|
for entity_data in entities_to_store.values()
|
||||||
|
)
|
||||||
|
doc_entities_data = {
|
||||||
|
"entity_names": entity_names,
|
||||||
|
"count": len(entity_names),
|
||||||
|
"update_time": int(time.time()),
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Update existing document entry
|
||||||
|
existing_entity_names = set(
|
||||||
|
current_doc_entities.get("entity_names", [])
|
||||||
|
)
|
||||||
|
new_entity_names = [
|
||||||
|
entity_data["entity_name"]
|
||||||
|
for entity_data in entities_to_store.values()
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add new multimodal entities to the list (avoid duplicates)
|
||||||
|
for entity_name in new_entity_names:
|
||||||
|
existing_entity_names.add(entity_name)
|
||||||
|
|
||||||
|
doc_entities_data = {
|
||||||
|
"entity_names": list(existing_entity_names),
|
||||||
|
"count": len(existing_entity_names),
|
||||||
|
"update_time": int(time.time()),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Store updated data
|
||||||
|
await self.lightrag.full_entities.upsert({doc_id: doc_entities_data})
|
||||||
|
await self.lightrag.full_entities.index_done_callback()
|
||||||
|
|
||||||
|
self.logger.debug(
|
||||||
|
f"Added {len(entities_to_store)} multimodal main entities to full_entities for doc {doc_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(
|
||||||
|
f"Error storing multimodal entities to full_entities: {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
async def _batch_extract_entities_lightrag_style_type_aware(
|
async def _batch_extract_entities_lightrag_style_type_aware(
|
||||||
self, lightrag_chunks: Dict[str, Any]
|
self, lightrag_chunks: Dict[str, Any]
|
||||||
) -> List[Tuple]:
|
) -> List[Tuple]:
|
||||||
|
|||||||
Reference in New Issue
Block a user