Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 新增ini文件读取数据库配置方式,方便生产环境,修改Lightrag ainsert方法_add_doc_keys获取方式,原… #631

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[redis]
uri = redis://localhost:6379

[neo4j]
uri = #
username = neo4j
password = 12345678

[milvus]
uri = #
user = root
password = Milvus
db_name = lightrag
90 changes: 86 additions & 4 deletions lightrag/api/lightrag_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import aiofiles
from ascii_colors import trace_exception, ASCIIColors
import os
import configparser

from fastapi import Depends, Security
from fastapi.security import APIKeyHeader
Expand Down Expand Up @@ -57,6 +58,52 @@ def estimate_tokens(text: str) -> int:
LIGHTRAG_CREATED_AT = "2024-01-15T00:00:00Z"
LIGHTRAG_DIGEST = "sha256:lightrag"

KV_STORAGE = "JsonKVStorage"
DOC_STATUS_STORAGE = "JsonDocStatusStorage"
GRAPH_STORAGE = "NetworkXStorage"
VECTOR_STORAGE = "NanoVectorDBStorage"

# read config.ini
config = configparser.ConfigParser()
config.read("config.ini")
# Redis config
redis_uri = config.get("redis", "uri", fallback=None)
if redis_uri:
os.environ["REDIS_URI"] = redis_uri
KV_STORAGE = "RedisKVStorage"
DOC_STATUS_STORAGE = "RedisKVStorage"

# Neo4j config
neo4j_uri = config.get("neo4j", "uri", fallback=None)
neo4j_username = config.get("neo4j", "username", fallback=None)
neo4j_password = config.get("neo4j", "password", fallback=None)
if neo4j_uri:
os.environ["NEO4J_URI"] = neo4j_uri
os.environ["NEO4J_USERNAME"] = neo4j_username
os.environ["NEO4J_PASSWORD"] = neo4j_password
GRAPH_STORAGE = "Neo4JStorage"

# Milvus config
milvus_uri = config.get("milvus", "uri", fallback=None)
milvus_user = config.get("milvus", "user", fallback=None)
milvus_password = config.get("milvus", "password", fallback=None)
milvus_db_name = config.get("milvus", "db_name", fallback=None)
if milvus_uri:
os.environ["MILVUS_URI"] = milvus_uri
os.environ["MILVUS_USER"] = milvus_user
os.environ["MILVUS_PASSWORD"] = milvus_password
os.environ["MILVUS_DB_NAME"] = milvus_db_name
VECTOR_STORAGE = "MilvusVectorDBStorge"

# MongoDB config
mongo_uri = config.get("mongodb", "uri", fallback=None)
mongo_database = config.get("mongodb", "LightRAG", fallback=None)
if mongo_uri:
os.environ["MONGO_URI"] = mongo_uri
os.environ["MONGO_DATABASE"] = mongo_database
KV_STORAGE = "MongoKVStorage"
DOC_STATUS_STORAGE = "MongoKVStorage"


def get_default_host(binding_type: str) -> str:
default_hosts = {
Expand Down Expand Up @@ -337,6 +384,18 @@ def parse_args() -> argparse.Namespace:
help="Embedding model name (default: from env or bge-m3:latest)",
)

parser.add_argument(
"--chunk_size",
default=1200,
help="chunk token size default 1200",
)

parser.add_argument(
"--chunk_overlap_size",
default=100,
help="chunk token size default 1200",
)

def timeout_type(value):
if value is None or value == "None":
return None
Expand Down Expand Up @@ -551,7 +610,14 @@ async def api_key_auth(api_key_header_value: str | None = Security(api_key_heade

def create_app(args):
# Verify that bindings arer correctly setup
if args.llm_binding not in ["lollms", "ollama", "openai", "azure_openai"]:

if args.llm_binding not in [
"lollms",
"ollama",
"openai",
"openai-ollama",
"azure_openai",
]:
raise Exception("llm binding not supported")

if args.embedding_binding not in ["lollms", "ollama", "openai", "azure_openai"]:
Expand Down Expand Up @@ -692,33 +758,49 @@ async def azure_openai_model_complete(
)

# Initialize RAG
if args.llm_binding in ["lollms", "ollama"]:
if args.llm_binding in ["lollms", "ollama", "openai-ollama"]:
rag = LightRAG(
working_dir=args.working_dir,
llm_model_func=lollms_model_complete
if args.llm_binding == "lollms"
else ollama_model_complete,
else ollama_model_complete
if args.llm_binding == "ollama"
else openai_alike_model_complete,
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
llm_model_max_token_size=args.max_tokens,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs={
"host": args.llm_binding_host,
"timeout": args.timeout,
"options": {"num_ctx": args.max_tokens},
"api_key": args.llm_binding_api_key,
},
}
if args.llm_binding == "lollms" or args.llm_binding == "ollama"
else {},
embedding_func=embedding_func,
kv_storage=KV_STORAGE,
graph_storage=GRAPH_STORAGE,
vector_storage=VECTOR_STORAGE,
doc_status_storage=DOC_STATUS_STORAGE,
)
else:
rag = LightRAG(
working_dir=args.working_dir,
llm_model_func=azure_openai_model_complete
if args.llm_binding == "azure_openai"
else openai_alike_model_complete,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
llm_model_max_token_size=args.max_tokens,
embedding_func=embedding_func,
kv_storage=KV_STORAGE,
graph_storage=GRAPH_STORAGE,
vector_storage=VECTOR_STORAGE,
doc_status_storage=DOC_STATUS_STORAGE,
)

async def index_file(file_path: Union[str, Path]) -> None:
Expand Down
12 changes: 9 additions & 3 deletions lightrag/lightrag.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ async def ainsert(
}

# 3. Filter out already processed documents
_add_doc_keys = await self.doc_status.filter_keys(list(new_docs.keys()))
# _add_doc_keys = await self.doc_status.filter_keys(list(new_docs.keys()))
_add_doc_keys = {
doc_id
for doc_id in new_docs.keys()
if (current_doc := await self.doc_status.get_by_id(doc_id)) is None
or current_doc["status"] == DocStatus.FAILED
}
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}

if not new_docs:
Expand Down Expand Up @@ -573,7 +579,7 @@ async def apipeline_process_documents(self, string_or_strings):
_not_stored_doc_keys = await self.full_docs.filter_keys(list(new_docs.keys()))
if len(_not_stored_doc_keys) < len(new_docs):
logger.info(
f"Skipping {len(new_docs)-len(_not_stored_doc_keys)} already existing documents"
f"Skipping {len(new_docs) - len(_not_stored_doc_keys)} already existing documents"
)
new_docs = {k: v for k, v in new_docs.items() if k in _not_stored_doc_keys}

Expand Down Expand Up @@ -618,7 +624,7 @@ async def apipeline_process_chunks(self):
batch_docs = dict(list(new_docs.items())[i : i + batch_size])
for doc_id, doc in tqdm_async(
batch_docs.items(),
desc=f"Level 1 - Spliting doc in batch {i//batch_size + 1}",
desc=f"Level 1 - Spliting doc in batch {i // batch_size + 1}",
):
try:
# Generate chunks from document
Expand Down
3 changes: 3 additions & 0 deletions lightrag/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ async def upsert(self, data: dict[str, dict]):
await self.index_done_callback()
return data

async def get_by_id(self, id: str):
return self._data.get(id)

async def get(self, doc_id: str) -> Union[DocProcessingStatus, None]:
"""Get document status by ID"""
return self._data.get(doc_id)
Expand Down
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ aiofiles
aiohttp
aioredis
asyncpg
configparser

# database packages
graspologic
Expand All @@ -17,13 +18,19 @@ numpy
ollama
openai
oracledb
pipmaster
psycopg-pool
psycopg[binary,pool]
pydantic
pymilvus
pymongo
pymysql


PyPDF2
python-docx
python-dotenv
python-pptx
pyvis
setuptools
# lmdeploy[all]
Expand Down
Loading