Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docsite rag #176

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/plenty-lizards-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"apollo": minor
---

Add new docsite rag
38 changes: 38 additions & 0 deletions services/embed_docsite/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## Embed Docsite (RAG)

This service embeds the OpenFn Documentation to a vector database. It downloads, chunks, processes metadata, embeds and uploads the documentation to a vector database (Pinecone).

## Usage - Embedding OpenFn Documentation

The vector database used here is Pinecone. To obtain the env variables follow these steps:

1. Create an account on [Pinecone] and set up a free cluster.
2. Obtain the URL and token for the cluster and add them to the `.env` file.
3. You'll also need an OpenAI API key to generate embeddings.

### With the CLI, returning to stdout:

```bash
openfn apollo embed_docsite tmp/payload.json
```
To run directly from this repo (note that the server must be started):

```bash
bun py embed_docsite tmp/payload.json -O
```

## Implementation
The service uses the DocsiteProcessor to download the documentation and chunk it into smaller parts. The DocsiteIndexer formats metadata, creates a new collection, embeds the chunked texts (OpenAI) and uploads them into the vector database (Pinecone).

The chunked texts can be viewed in `tmp/split_sections`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh nice - this is still true? I'd love to run a test tomorrow and have a look, just to see what's going on


## Payload Reference
The input payload is a JSON object with the following structure:

```js
{
"docs_to_upload": ["adaptor_docs", "general_docs", "adaptor_functions"], // Select 1-3 types of documentation to upload
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to default all this

We should generate the collection name according to the versioning strategy (see my earlier comment). To be fair, versioning by date is also a great solution and I'm quite happy to roll with it.

By default we should upload all docs, but users should be able to specify fewer if they want.

It would be more conventional to ask the user to pass an api key and pinecone URL. Maybe apollo should have its own credentials for this.. but also maybe not? I think we should give this more thought.

But users must be able to pass credentials, even if if apollo provides a default

"collection_name": "Docsite-20250225", // Name of the collection in the vector database.
}
```

137 changes: 137 additions & 0 deletions services/embed_docsite/docsite_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
import time
from datetime import datetime
import json
import pandas as pd
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import DataFrameLoader
from util import create_logger, ApolloError

logger = create_logger("DocsiteIndexer")

class DocsiteIndexer:
"""
Initialize vectorstore and insert new documents. Create a new index if needed.

:param collection_name: Vectorstore collection name (namespace) to store documents
:param index_name: Vectostore index name (default: docsite)
:param embeddings: LangChain embedding type (default: OpenAIEmbeddings())
:param dimension: Embedding dimension (default: 1536 for OpenAI Embeddings)
"""
def __init__(self, collection_name, index_name="docsite", embeddings=OpenAIEmbeddings(), dimension=1536):
self.collection_name = collection_name
self.index_name = index_name
self.embeddings = embeddings
self.dimension = dimension
self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))
self.index = self.pc.Index(self.index_name)
self.vectorstore = PineconeVectorStore(index_name=index_name, namespace=collection_name, embedding=embeddings)

def index_exists(self):
"""Check if the index exists in Pinecone."""
existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]

return self.index_name in existing_indexes

def create_index(self):
"""Creates a new Pinecone index if it does not exist."""

if not self.index_exists():
self.pc.create_index(
name=self.index_name,
dimension=self.dimension,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
while not self.pc.describe_index(self.index_name).status["ready"]:
time.sleep(1)

def delete_collection(self):
"""Deletes the entire collection (namespace) and all its contents.
This operation cannot be undone and removes both the collection structure
and all vectors/documents within it."""
self.index.delete(delete_all=True, namespace=self.collection_name)

def preprocess_metadata(self, inputs, page_content_column="text", add_chunk_as_metadata=False, metadata_cols=None, metadata_dict=None):
"""
Create a DataFrame for indexing from input documents and metadata.

:param inputs: Dictionary containing name, docs_type, and doc_chunk
:param page_content_column: Name of the field which will be embedded (default: text)
:param add_chunk_as_metadata: Copy the text to embed as a separate metadata field (default: False)
:param metadata_cols: Optional list of metadata columns to include (default: None)
:param metadata_dict: Dictionary mapping names to metadata dictionaries (default: None)
:return: pandas.DataFrame with text and metadata columns
"""

# Create DataFrame from the inputs (doc_chunk, name, docs_type)
df = pd.DataFrame(inputs)

# Rename some columns for metadata upload
df = df.rename(columns={"doc_chunk": page_content_column, "name": "doc_title"})

df["doc_title"] = df["doc_title"].str.replace(".md$", "", regex=True)

# Optionally add chunk to metadata for keyword searching
if add_chunk_as_metadata:
df["embedding_text"] = df[page_content_column]

# Add further metadata columns if specified
if metadata_cols:
for col in metadata_cols:
df[col] = metadata_dict.get(inputs["name"], {}).get(col)

return df

def insert_documents(self, inputs, metadata_dict):
"""
Create the index if it does not exist and insert the input documents.

:param inputs: Dictionary containing name, docs_type, and doc_chunk
:param metadata_dict: Metadata dict with document titles as keys (from DocsiteProcessor)
:return: Initialized indices
"""
# Get vector count before insertion for verification
try:
stats = self.index.describe_index_stats()
vectors_before = stats.namespaces.get(self.collection_name, {}).get("vector_count", 0)
logger.info(f"Current vector count in namespace '{self.collection_name}': {vectors_before}")
except Exception as e:
logger.warning(f"Could not get vector count before insertion: {str(e)}")
vectors_before = 0

df = self.preprocess_metadata(inputs=inputs, metadata_dict=metadata_dict)
logger.info(f"Input metadata preprocessed")
loader = DataFrameLoader(df, page_content_column="text")
docs = loader.load()
logger.info(f"Inputs processed into LangChain docs")
logger.info(f"Uploading {len(docs)} documents to index...")

idx = self.vectorstore.add_documents(
documents=docs
)
sleep_time = 30
logger.info(f"Waiting for {sleep_time}s to verify upload count")
time.sleep(sleep_time)

# Verify the upload by checking the vector count
try:
stats = self.index.describe_index_stats()
vectors_after = stats.namespaces.get(self.collection_name, {}).get("vector_count", 0)
logger.info(f"Vector count after insertion: {vectors_after}")

if vectors_after > vectors_before:
logger.info(f"Successfully added {vectors_after - vectors_before} vectors to namespace '{self.collection_name}'")
else:
logger.warning(f"No new vectors were added to namespace '{self.collection_name}' after {sleep_time}s")
except Exception as e:
logger.warning(f"Could not verify vector insertion: {str(e)}")

return idx





187 changes: 187 additions & 0 deletions services/embed_docsite/docsite_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import json
import os
import logging
import re
import requests
import nltk
from embed_docsite.github_utils import get_docs
from util import create_logger, ApolloError

nltk.download('punkt_tab')

logger = create_logger("DocsiteProcessor")

class DocsiteProcessor:
"""
Processes documentation sites by cleaning, splitting, and chunking text.

:param docs_type: Type of documentation being processed ("adaptor_functions", "general_docs", "adaptor_docs")
:param output_dir: Directory to store processed chunks (default: "./tmp/split_sections").
"""
def __init__(self, docs_type, output_dir="./tmp/split_sections"):
self.output_dir = output_dir
self.docs_type = docs_type
self.metadata_dict = None

def _write_chunks_to_file(self, chunks, file_name):
"""
Writes a list of documentation chunks to a JSON file in the specified directory.

:param file_name: Name of the file to write to
:param chunks: List of tuples (chunk, name) to write to the file
"""
os.makedirs(self.output_dir, exist_ok=True)
output_file = os.path.join(self.output_dir, file_name)
if os.path.exists(output_file):
try:
os.remove(output_file)
logger.info(f"Existing output file '{output_file}' has been deleted.")
except OSError as e:
logger.error(f"Error deleting the file {output_file}: {e}")

# Write to a JSON file
with open(output_file, 'w') as f:
json.dump(chunks, f, indent=2)

logger.info(f"Content written to {output_file}")

def _clean_html(self, text):
"""Remove HTML tags while preserving essential formatting."""
text = re.sub(r'<\/?p>', '\n', text) # Convert <p> to newlines
text = re.sub(r'<\/?code>', '`', text) # Convert <code> to backticks
text = re.sub(r'<\/?strong>', '**', text) # Convert <strong> to bold
text = re.sub(r'<[^>]+>', '', text) # Remove other HTML tags

return text.strip()

def _split_by_headers(self, text):
"""Split text into chunks based on Markdown headers (# and ##) and code blocks."""
sections = re.split(r'(?=^#+\s.*$|^```(?:.*\n[\s\S]*?^```))', text, flags=re.MULTILINE)

return [chunk.strip() for chunk in sections if chunk.strip()]

def _split_oversized_chunks(self, chunks, target_length):
"""Check if chunks are over the target lengths, and split them further if needed."""
result = []

for chunk in chunks:
if len(chunk) <= target_length:
result.append(chunk)
else:
# Chunk is too big, split by newlines
lines = chunk.split('\n')
current_chunk = ""

for line in lines:
# If adding this line would exceed target size and we already have content
if len(current_chunk) + len(line) + 1 > target_length and current_chunk:
result.append(current_chunk)
current_chunk = line
else:
# Add a newline if the chunk isn't empty
if current_chunk:
current_chunk += '\n'
current_chunk += line

# Add the last chunk
if current_chunk:
result.append(current_chunk)

return result

def _accumulate_chunks(self, splits, target_length, overlap, min_length):
"""Merge smaller chunks to get as close to target_length as possible."""

accumulated = []
current_chunk = ""
last_overlap_length = 0

for split in splits:
if len(current_chunk) + len(split) <= target_length:
current_chunk += split
else:
if len(current_chunk) >= min_length:
accumulated.append(current_chunk) # Store the completed chunk

# add overlap
if self.docs_type == "adaptor_functions":
overlap_sections = " ".join(current_chunk.split("\n")[-overlap:])
else:
overlap_sections = " ".join(nltk.sent_tokenize(current_chunk)[-overlap:]) # Split by sentences (doesn't split code)
current_chunk = overlap_sections + split # Start a new chunk
last_overlap_length = len(overlap_sections)
else:
# Current chunk is too small, add the next split even though it exceeds target_length
current_chunk += split

if current_chunk:
if len(current_chunk) >= min_length or len(accumulated)==0:
accumulated.append(current_chunk)
else:
current_chunk = current_chunk[last_overlap_length:] # Avoid duplication inside one chunk
filler_char = min_length - len(current_chunk)
if len(accumulated[-1]) > filler_char:
filler_overlap = accumulated[-1][-filler_char:]
accumulated.append(filler_overlap + current_chunk)
else:
accumulated[-1] = accumulated[-1] + current_chunk

return accumulated

def _chunk_adaptor_docs(self, json_data, target_length=1000, overlap=1, min_length=700):
"""
Extract and clean docs from adaptor data, and chunk according to a target and minimum chunk sizes.

:param json_data: JSON containing adaptor data dictionaries with the keys "docs" (text) and "name" (text title)
:param target_length: Target chunk size in characters (default: 1000)
:param overlap: Target number of sentences to overlap between chunks (default: 1)
:param min_length: Minimum chunk size in characters (default: 700)

:return: List of chunk dictionaries {name, docs_type, doc_chunk} and a dictionary mapping adaptor_name to original data dictionary
"""

output = []
metadata_dict = dict()

for item in json_data:
if isinstance(item, dict) and "docs" in item and "name" in item:

docs = item["docs"]
name = item["name"]

# Decode JSON string
try:
docs = json.loads(docs)
except json.JSONDecodeError:
pass

docs = self._clean_html(docs)

# Save all fields for adding to metadata later
item["docs"] = docs # replace docs with cleaned text
metadata_dict[name] = item

# Split by headers, and where needed, sentences
splits = self._split_by_headers(docs)
splits = self._split_oversized_chunks(chunks=splits, target_length=target_length)
chunks = self._accumulate_chunks(splits=splits, target_length=target_length, overlap=overlap, min_length=min_length)

for chunk in chunks:
output.append({"name": name, "docs_type": self.docs_type, "doc_chunk": chunk})

# self.metadata_dict = metadata_dict
self._write_chunks_to_file(chunks=output, file_name=f"{self.docs_type}_chunks.json")

return output, metadata_dict

def get_preprocessed_docs(self):
"""Fetch and process adaptor data."""
# Step 1: Download docs
docs = get_docs(docs_type=self.docs_type)

# Step 2: Process adaptor data
chunks, metadata_dict = self._chunk_adaptor_docs(docs)

logger.info(f"{self.docs_type} docs preprocessed and chunked")

return chunks, metadata_dict
Loading