Skip to content

Commit

Permalink
Merge pull request #553 from GurjotSinghShorthillsAI/custom-chunking-…
Browse files Browse the repository at this point in the history
…feature

Implement custom chunking feature
  • Loading branch information
LarFii authored Jan 9, 2025
2 parents 9e7784a + 9565a46 commit 7973f46
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,73 @@ async def ainsert(self, string_or_strings, split_by_character):
# Ensure all indexes are updated after each document
await self._insert_done()

def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.ainsert_custom_chunks(full_text, text_chunks)
)

async def ainsert_custom_chunks(self, full_text: str, text_chunks: list[str]):
update_storage = False
try:
doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-")
new_docs = {doc_key: {"content": full_text.strip()}}

_add_doc_keys = await self.full_docs.filter_keys([doc_key])
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
if not len(new_docs):
logger.warning("This document is already in the storage.")
return

update_storage = True
logger.info(f"[New Docs] inserting {len(new_docs)} docs")

inserting_chunks = {}
for chunk_text in text_chunks:
chunk_text_stripped = chunk_text.strip()
chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-")

inserting_chunks[chunk_key] = {
"content": chunk_text_stripped,
"full_doc_id": doc_key,
}

_add_chunk_keys = await self.text_chunks.filter_keys(
list(inserting_chunks.keys())
)
inserting_chunks = {
k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys
}
if not len(inserting_chunks):
logger.warning("All chunks are already in the storage.")
return

logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")

await self.chunks_vdb.upsert(inserting_chunks)

logger.info("[Entity Extraction]...")
maybe_new_kg = await extract_entities(
inserting_chunks,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
)

if maybe_new_kg is None:
logger.warning("No new entities and relationships found")
return
else:
self.chunk_entity_relation_graph = maybe_new_kg

await self.full_docs.upsert(new_docs)
await self.text_chunks.upsert(inserting_chunks)

finally:
if update_storage:
await self._insert_done()

async def _insert_done(self):
tasks = []
for storage_inst in [
Expand Down

0 comments on commit 7973f46

Please sign in to comment.