Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docsite rag #176

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/plenty-lizards-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"apollo": minor
---

Add new docsite rag
38 changes: 37 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ langchain-community = "^0.3.15"
langchain-openai = "^0.3.1"
datasets = "^3.2.0"
httpx = "0.27.0"
nltk = "^3.9.1"
[tool.poetry.group.ft]
optional = true

Expand Down
38 changes: 38 additions & 0 deletions services/embed_docsite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Embed Docsite (RAG)

This service embeds the OpenFn Documentation to a vector database. It downloads, chunks, processes metadata, embeds and uploads the documentation to a vector database (Pinecone).

## Usage - Embedding OpenFn Documentation

The vector database used here is Pinecone. To obtain the env variables follow these steps:

1. Create an account on [Pinecone] and set up a free cluster.
2. Obtain the URL and token for the cluster and add them to the `.env` file.
3. You'll also need an OpenAI API key to generate embeddings.

### With the CLI, returning to stdout:

```bash
openfn apollo embed_docsite tmp/payload.json
```
To run directly from this repo (note that the server must be started):

```bash
bun py embed_docsite tmp/payload.json -O
```

## Implementation
The service uses the DocsiteProcessor to download the documentation and chunk it into smaller parts. The DocsiteIndexer formats metadata, creates a new collection, embeds the chunked texts (OpenAI) and uploads them into the vector database (Pinecone).

The chunked texts can be viewed in `tmp/split_sections`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh nice - this is still true? I'd love to run a test tomorrow and have a look, just to see what's going on


## Payload Reference
The input payload is a JSON object with the following structure:

```js
{
"docs_to_upload": ["adaptor_docs", "general_docs", "adaptor_functions"], // Select 1-3 types of documentation to upload
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to default all this

We should generate the collection name according to the versioning strategy (see my earlier comment). To be fair, versioning by date is also a great solution and I'm quite happy to roll with it.

By default we should upload all docs, but users should be able to specify fewer if they want.

It would be more conventional to ask the user to pass an api key and pinecone URL. Maybe apollo should have its own credentials for this.. but also maybe not? I think we should give this more thought.

But users must be able to pass credentials, even if if apollo provides a default

"collection_name": "Docsite-20250225", // Name of the collection in the vector database.
}
```

142 changes: 142 additions & 0 deletions services/embed_docsite/docsite_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os
import time
from datetime import datetime
import json
import pandas as pd
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import DataFrameLoader
from util import create_logger, ApolloError

logger = create_logger("DocsiteIndexer")

class DocsiteIndexer:
"""
Initialize vectorstore and insert new documents. Create a new index if needed.

:param collection_name: Vectorstore collection name (namespace) to store documents
:param index_name: Vectostore index name (default: docsite)
:param embeddings: LangChain embedding type (default: OpenAIEmbeddings())
:param dimension: Embedding dimension (default: 1536 for OpenAI Embeddings)
"""
def __init__(self, collection_name, index_name="docsite", embeddings=OpenAIEmbeddings(), dimension=1536):
self.collection_name = collection_name
self.index_name = index_name
self.embeddings = embeddings
self.dimension = dimension
self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))

if not self.index_exists():
self.create_index()

self.index = self.pc.Index(self.index_name)
self.vectorstore = PineconeVectorStore(index_name=index_name, namespace=collection_name, embedding=embeddings)

def index_exists(self):
"""Check if the index exists in Pinecone."""
existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]

return self.index_name in existing_indexes

def create_index(self):
"""Creates a new Pinecone index if it does not exist."""

if not self.index_exists():
self.pc.create_index(
name=self.index_name,
dimension=self.dimension,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
while not self.pc.describe_index(self.index_name).status["ready"]:
time.sleep(1)

def delete_collection(self):
"""Deletes the entire collection (namespace) and all its contents.
This operation cannot be undone and removes both the collection structure
and all vectors/documents within it."""
self.index.delete(delete_all=True, namespace=self.collection_name)

def preprocess_metadata(self, inputs, page_content_column="text", add_chunk_as_metadata=False, metadata_cols=None, metadata_dict=None):
"""
Create a DataFrame for indexing from input documents and metadata.

:param inputs: Dictionary containing name, docs_type, and doc_chunk
:param page_content_column: Name of the field which will be embedded (default: text)
:param add_chunk_as_metadata: Copy the text to embed as a separate metadata field (default: False)
:param metadata_cols: Optional list of metadata columns to include (default: None)
:param metadata_dict: Dictionary mapping names to metadata dictionaries (default: None)
:return: pandas.DataFrame with text and metadata columns
"""

# Create DataFrame from the inputs (doc_chunk, name, docs_type)
df = pd.DataFrame(inputs)

# Rename some columns for metadata upload
df = df.rename(columns={"doc_chunk": page_content_column, "name": "doc_title"})

df["doc_title"] = df["doc_title"].str.replace(".md$", "", regex=True)

# Optionally add chunk to metadata for keyword searching
if add_chunk_as_metadata:
df["embedding_text"] = df[page_content_column]

# Add further metadata columns if specified
if metadata_cols:
for col in metadata_cols:
df[col] = metadata_dict.get(inputs["name"], {}).get(col)

return df

def insert_documents(self, inputs, metadata_dict):
"""
Create the index if it does not exist and insert the input documents.

:param inputs: Dictionary containing name, docs_type, and doc_chunk
:param metadata_dict: Metadata dict with document titles as keys (from DocsiteProcessor)
:return: Initialized indices
"""

# Get vector count before insertion for verification
try:
stats = self.index.describe_index_stats()
vectors_before = stats.namespaces.get(self.collection_name, {}).get("vector_count", 0)
logger.info(f"Current vector count in namespace '{self.collection_name}': {vectors_before}")
except Exception as e:
logger.warning(f"Could not get vector count before insertion: {str(e)}")
vectors_before = 0

df = self.preprocess_metadata(inputs=inputs, metadata_dict=metadata_dict)
logger.info(f"Input metadata preprocessed")
loader = DataFrameLoader(df, page_content_column="text")
docs = loader.load()
logger.info(f"Inputs processed into LangChain docs")
logger.info(f"Uploading {len(docs)} documents to index...")

idx = self.vectorstore.add_documents(
documents=docs
)
sleep_time = 30
logger.info(f"Waiting for {sleep_time}s to verify upload count")
time.sleep(sleep_time)

# Verify the upload by checking the vector count
try:
stats = self.index.describe_index_stats()
vectors_after = stats.namespaces.get(self.collection_name, {}).get("vector_count", 0)
logger.info(f"Vector count after insertion: {vectors_after}")

if vectors_after > vectors_before:
logger.info(f"Successfully added {vectors_after - vectors_before} vectors to namespace '{self.collection_name}'")
else:
logger.warning(f"No new vectors were added to namespace '{self.collection_name}' after {sleep_time}s")
except Exception as e:
logger.warning(f"Could not verify vector insertion: {str(e)}")

return idx





Loading