diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 7496d736..6af29aa2 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -458,6 +458,73 @@ async def ainsert(self, string_or_strings, split_by_character): # Ensure all indexes are updated after each document await self._insert_done() + def insert_custom_chunks(self, full_text: str, text_chunks: list[str]): + loop = always_get_an_event_loop() + return loop.run_until_complete( + self.ainsert_custom_chunks(full_text, text_chunks) + ) + + async def ainsert_custom_chunks(self, full_text: str, text_chunks: list[str]): + update_storage = False + try: + doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-") + new_docs = {doc_key: {"content": full_text.strip()}} + + _add_doc_keys = await self.full_docs.filter_keys([doc_key]) + new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys} + if not len(new_docs): + logger.warning("This document is already in the storage.") + return + + update_storage = True + logger.info(f"[New Docs] inserting {len(new_docs)} docs") + + inserting_chunks = {} + for chunk_text in text_chunks: + chunk_text_stripped = chunk_text.strip() + chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-") + + inserting_chunks[chunk_key] = { + "content": chunk_text_stripped, + "full_doc_id": doc_key, + } + + _add_chunk_keys = await self.text_chunks.filter_keys( + list(inserting_chunks.keys()) + ) + inserting_chunks = { + k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys + } + if not len(inserting_chunks): + logger.warning("All chunks are already in the storage.") + return + + logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks") + + await self.chunks_vdb.upsert(inserting_chunks) + + logger.info("[Entity Extraction]...") + maybe_new_kg = await extract_entities( + inserting_chunks, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + global_config=asdict(self), + ) + + if maybe_new_kg is None: + logger.warning("No new entities and relationships found") + return + else: + self.chunk_entity_relation_graph = maybe_new_kg + + await self.full_docs.upsert(new_docs) + await self.text_chunks.upsert(inserting_chunks) + + finally: + if update_storage: + await self._insert_done() + async def _insert_done(self): tasks = [] for storage_inst in [