Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sweep: Fix the multi-processing error in sweepai/core/vector_db.py by setting min 1 process #3296

Closed
kevinlu1248 opened this issue Mar 15, 2024 · 2 comments · Fixed by #3304 · May be fixed by #3298, #3299 or #3301
Closed

Sweep: Fix the multi-processing error in sweepai/core/vector_db.py by setting min 1 process #3296

kevinlu1248 opened this issue Mar 15, 2024 · 2 comments · Fixed by #3304 · May be fixed by #3298, #3299 or #3301
Labels
sweep Assigns Sweep to an issue or pull request.

Comments

@kevinlu1248
Copy link
Member

kevinlu1248 commented Mar 15, 2024

Details

ValueError: Number of processes must be at least 1

Branch

No response

Copy link
Contributor

sweep-nightly bot commented Mar 15, 2024

Sweeping

✨ Track Sweep's progress on our progress dashboard!


0%

💎 Sweep Pro: I'm using GPT-4. You have unlimited GPT-4 tickets. (tracking ID: 0a34477778)

Tip

I can email you when I complete this pull request if you set up your email here!


Actions (click)

  • ↻ Restart Sweep

I am currently looking into this ticket! I will update the progress of the ticket in this comment. I am currently searching through your code, looking for relevant snippets.


Step 1: 🔎 Searching

I'm searching for relevant snippets in your repository. If this is your first time using Sweep, I'm indexing your repository. You can monitor the progress using the progress dashboard


🎉 Latest improvements to Sweep:
  • New dashboard launched for real-time tracking of Sweep issues, covering all stages from search to coding.
  • Integration of OpenAI's latest Assistant API for more efficient and reliable code planning and editing, improving speed by 3x.
  • Use the GitHub issues extension for creating Sweep issues directly from your editor.

💡 To recreate the pull request edit the issue title or description.
Something wrong? Let us know.

@kevinlu1248 kevinlu1248 changed the title Sweep: Fix the multi-processing error in vector_db by setting min 1 proces Sweep: Fix the multi-processing error in sweepai/core/vector_db.py by setting min 1 process Mar 15, 2024
@sweepai sweepai deleted a comment from sweep-nightly bot Mar 15, 2024
@sweepai sweepai deleted a comment from sweep-nightly bot Mar 15, 2024
Copy link
Contributor

sweep-nightly bot commented Mar 15, 2024

🚀 Here's the PR! #3304

See Sweep's progress at the progress dashboard!
💎 Sweep Pro: I'm using GPT-4. You have unlimited GPT-4 tickets. (tracking ID: 5f98f46fd3)

Tip

I can email you next time I complete a pull request if you set up your email here!


Actions (click)

  • ↻ Restart Sweep

Step 1: 🔎 Searching

I found the following snippets in your repository. I will now analyze these snippets and come up with a plan.

Some code snippets I think are relevant in decreasing order of relevance (click to expand). If some file is missing from here, you can mention the path in the ticket description.

import json
import multiprocessing
from typing import Generator
import backoff
import numpy as np
import requests
from loguru import logger
from openai import AzureOpenAI, OpenAI
from redis import Redis
from tqdm import tqdm
from sweepai.config.server import (
BATCH_SIZE,
OPENAI_API_TYPE,
OPENAI_EMBEDDINGS_API_TYPE,
OPENAI_EMBEDDINGS_AZURE_API_KEY,
OPENAI_EMBEDDINGS_AZURE_API_VERSION,
OPENAI_EMBEDDINGS_AZURE_DEPLOYMENT,
OPENAI_EMBEDDINGS_AZURE_ENDPOINT,
REDIS_URL,
)
from sweepai.logn.cache import file_cache
from sweepai.utils.hash import hash_sha256
from sweepai.utils.utils import Tiktoken
if OPENAI_EMBEDDINGS_API_TYPE == "openai":
client = OpenAI()
elif OPENAI_EMBEDDINGS_API_TYPE == "azure":
client = AzureOpenAI(
azure_endpoint=OPENAI_EMBEDDINGS_AZURE_ENDPOINT,
api_key=OPENAI_EMBEDDINGS_AZURE_API_KEY,
azure_deployment=OPENAI_EMBEDDINGS_AZURE_DEPLOYMENT,
api_version=OPENAI_EMBEDDINGS_AZURE_API_VERSION,
)
else:
raise ValueError(f"Invalid OPENAI_API_TYPE: {OPENAI_API_TYPE}")
CACHE_VERSION = "v1.3.04"
redis_client: Redis = Redis.from_url(REDIS_URL) # TODO: add lazy loading
tiktoken_client = Tiktoken()
def cosine_similarity(a, B):
dot_product = np.dot(B, a.T) # B is MxN, a.T is Nx1, resulting in Mx1
norm_a = np.linalg.norm(a)
norm_B = np.linalg.norm(B, axis=1)
return dot_product.flatten() / (norm_a * norm_B) # Flatten to make it a 1D array
def chunk(texts: list[str], batch_size: int) -> Generator[list[str], None, None]:
logger.info(f"Truncating {len(texts)} texts")
texts = [text[:25000] if len(text) > 25000 else text for text in texts]
# remove empty string
texts = [text if text else " " for text in texts]
logger.info(f"Finished truncating {len(texts)} texts")
for i in range(0, len(texts), batch_size):
yield texts[i : i + batch_size] if i + batch_size < len(texts) else texts[i:]
@file_cache(ignore_params=["texts"])
def get_query_texts_similarity(query: str, texts: str) -> float:
embeddings = embed_text_array(texts)
embeddings = np.concatenate(embeddings)
query_embedding = np.array(embed_text_array([query])[0])
similarity = cosine_similarity(query_embedding, embeddings)
similarity = similarity.tolist()
return similarity
def normalize_l2(x):
x = np.array(x)
if x.ndim == 1:
norm = np.linalg.norm(x)
if norm == 0:
return x
return x / norm
else:
norm = np.linalg.norm(x, 2, axis=1, keepdims=True)
return np.where(norm == 0, x, x / norm)
# lru_cache(maxsize=20)
def embed_text_array(texts: tuple[str]) -> list[np.ndarray]:
embeddings = []
texts = [text if text else " " for text in texts]
batches = [texts[i : i + BATCH_SIZE] for i in range(0, len(texts), BATCH_SIZE)]
with multiprocessing.Pool(processes=multiprocessing.cpu_count() // 4) as pool:
embeddings = list(
tqdm(
pool.imap(openai_with_expo_backoff, batches),
total=len(batches),
desc="openai embedding",
)
)
return embeddings
def openai_call_embedding(batch):
response = client.embeddings.create(
input=batch, model="text-embedding-3-small", encoding_format="float"
)
cut_dim = np.array([data.embedding for data in response.data])[:, :512]
normalized_dim = normalize_l2(cut_dim)
# save results to redis
return normalized_dim
@backoff.on_exception(
backoff.expo,
requests.exceptions.Timeout,
max_tries=5,
)
def openai_with_expo_backoff(batch: tuple[str]):
if not redis_client:
return openai_call_embedding(batch)
# check cache first
embeddings = [None] * len(batch)
cache_keys = [hash_sha256(text) + CACHE_VERSION for text in batch]
try:
for i, cache_value in enumerate(redis_client.mget(cache_keys)):
if cache_value:
embeddings[i] = np.array(json.loads(cache_value))
except Exception as e:
logger.exception(e)
# not stored in cache call openai
batch = [
text for i, text in enumerate(batch) if embeddings[i] is None
] # remove all the cached values from the batch
if len(batch) == 0:
embeddings = np.array(embeddings)
return embeddings # all embeddings are in cache
try:
# make sure all token counts are within model params (max: 8192)
new_embeddings = openai_call_embedding(batch)
except requests.exceptions.Timeout as e:
logger.exception(f"Timeout error occured while embedding: {e}")
except Exception as e:
logger.exception(e)
if any(tiktoken_client.count(text) > 8192 for text in batch):
logger.warning(
f"Token count exceeded for batch: {max([tiktoken_client.count(text) for text in batch])} truncating down to 8192 tokens."
)
batch = [tiktoken_client.truncate_string(text) for text in batch]
new_embeddings = openai_call_embedding(batch)
# get all indices where embeddings are None
indices = [i for i, emb in enumerate(embeddings) if emb is None]
# store the new embeddings in the correct position
assert len(indices) == len(new_embeddings)
for i, index in enumerate(indices):
embeddings[index] = new_embeddings[i]
# store in cache
try:
redis_client.mset(
{
cache_key: json.dumps(embedding.tolist())
for cache_key, embedding in zip(cache_keys, embeddings)
}
)
embeddings = np.array(embeddings)
except Exception as e:
logger.error(str(e))
logger.error("Failed to store embeddings in cache, returning without storing")
return embeddings
if __name__ == "__main__":
texts = ["sasxt " * 10000 for i in range(10)] + ["abb " * 1 for i in range(10)]


Step 2: ⌨️ Coding

Modify sweepai/core/vector_db.py with contents:
• Locate the `embed_text_array` function within the file.
• In the line where the multiprocessing pool is initialized (currently line 87), change the calculation of the number of processes to ensure it's at least 1. Specifically, modify the line from: ```python with multiprocessing.Pool(processes=multiprocessing.cpu_count() // 4) as pool: ``` to: ```python with multiprocessing.Pool(processes=max(1, multiprocessing.cpu_count() // 4)) as pool: ```
• This change uses the `max` function to ensure that the number of processes is at least 1, even if `multiprocessing.cpu_count() // 4` evaluates to 0. This directly addresses the issue reported without impacting the intended functionality of distributing the workload across multiple processes when possible.
--- 
+++ 
@@ -85,7 +85,7 @@
     embeddings = []
     texts = [text if text else " " for text in texts]
     batches = [texts[i : i + BATCH_SIZE] for i in range(0, len(texts), BATCH_SIZE)]
-    with multiprocessing.Pool(processes=multiprocessing.cpu_count() // 4) as pool:
+    with multiprocessing.Pool(processes=max(1, multiprocessing.cpu_count() // 4)) as pool:
         embeddings = list(
             tqdm(
                 pool.imap(openai_with_expo_backoff, batches),
  • Running GitHub Actions for sweepai/core/vector_db.pyEdit
Check sweepai/core/vector_db.py with contents:

Ran GitHub Actions for ccfea2cae869780d4b83337ca69da234a1402ad9:
• Vercel Preview Comments:


Step 3: 🔁 Code Review

I have finished reviewing the code for completeness. I did not find errors for sweep/fix_the_multiprocessing_error_in_sweepai.


🎉 Latest improvements to Sweep:
  • New dashboard launched for real-time tracking of Sweep issues, covering all stages from search to coding.
  • Integration of OpenAI's latest Assistant API for more efficient and reliable code planning and editing, improving speed by 3x.
  • Use the GitHub issues extension for creating Sweep issues directly from your editor.

💡 To recreate the pull request edit the issue title or description.
Something wrong? Let us know.

This is an automated message generated by Sweep AI.

@sweepai sweepai deleted a comment from sweep-nightly bot Mar 15, 2024
@sweepai sweepai deleted a comment from sweep-nightly bot Mar 15, 2024
kevinlu1248 added a commit that referenced this issue Mar 15, 2024
… setting min 1 process (#3304)

# Description
This pull request addresses an issue in the `sweepai/core/vector_db.py`
where the number of processes for the multiprocessing pool could be set
to zero on machines with a low number of CPU cores. The fix ensures that
there is always at least one process available for the pool, preventing
potential errors during the embedding of text arrays.

# Summary
- Fixed a potential division by zero error in
`sweepai/core/vector_db.py` by ensuring the multiprocessing pool is
always initialized with at least one process.
- Modified the calculation of the number of processes for the
`multiprocessing.Pool` to `max(1, multiprocessing.cpu_count() // 4)`,
guaranteeing a minimum of one process.
- This change affects the `embed_text_array` function within the
`vector_db.py` file, improving its reliability on systems with fewer CPU
cores.

Fixes #3296.

---

<details>
<summary><b>🎉 Latest improvements to Sweep:</b></summary>
<ul>
<li>New <a href="https://sweep-trilogy.vercel.app">dashboard</a>
launched for real-time tracking of Sweep issues, covering all stages
from search to coding.</li>
<li>Integration of OpenAI's latest Assistant API for more efficient and
reliable code planning and editing, improving speed by 3x.</li>
<li>Use the <a
href="https://marketplace.visualstudio.com/items?itemName=GitHub.vscode-pull-request-github">GitHub
issues extension</a> for creating Sweep issues directly from your
editor.</li>
</ul>
</details>


---

### 💡 To get Sweep to edit this pull request, you can:
* Comment below, and Sweep can edit the entire PR
* Comment on a file, Sweep will only modify the commented file
* Edit the original issue to get Sweep to recreate the PR from scratch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment