Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak when using AsyncElasticsearch #2478

Open
teuneboon opened this issue Mar 21, 2024 · 4 comments
Open

Memory leak when using AsyncElasticsearch #2478

teuneboon opened this issue Mar 21, 2024 · 4 comments

Comments

@teuneboon
Copy link

teuneboon commented Mar 21, 2024

Elasticsearch version (bin/elasticsearch --version): 8.2.0

elasticsearch-py version (elasticsearch.__versionstr__): 8.12.0

Python version: 3.9.2

Description of the problem including expected versus actual behavior:
We run an API with an endpoint that does a call to Elasticsearch. In this endpoint we initialize AsyncElasticsearch, run a search query(might be multiple in the future, but just one for now) and close the connection to Elasticsearch. We noticed that if this API endpoint is called a lot, memory used by the process running the API keeps increasing until the process is killed because it goes OOM.

Steps to reproduce:
I isolated the issue in a relatively simple script:

import asyncio

from elasticsearch import AsyncElasticsearch

SERVERS = [
    'https://elk001:9200',
    'https://elk002:9200',
    'https://elk003:9200',
]
INDEX = 'logs'
API_KEY = 'xxx'


async def leaky():
    while True:
        es = AsyncElasticsearch(SERVERS, api_key=API_KEY)
        async with es as client:
            await client.search(
                index=INDEX,
                body={
                    'from': 0,
                    'size': 0,
                    'query': {
                        'bool': {
                            'must': [],
                            'filter': [],
                            'should': [],
                            'must_not': [],
                        },
                    },
                },
            )
        print('completed a query')


if __name__ == '__main__':
    asyncio.run(leaky())

If you run this memory usage will quickly(< 1 minute in our setup) increase to about 1GiB and beyond. If you pull the es = AsyncElasticsearch initialization out of the while True loop memory still increases, but much more slowly(although unless I'm missing something, while it might not be best practice it still shouldn't leak that fast when it's inside the loop).

What I didn't test:
I didn't have time to fully analyze this with memory profilers. I'm also not sure if it's only search queries that are affected by this or if simple initializing AsyncElasticsearch without running any query already causes the leak to happen(or if any other request leaks). Didn't test whether the api key or SSL has an effect either. I just wanted an isolated testcase to confirm I was still sane. We solved this in the end by just switching back to the sync Elasticsearch client since we're not executing queries in parallel any time soon, but I still thought I'd report it in case others run into this issue.

@pquentin
Copy link
Member

pquentin commented Mar 22, 2024

Thanks @teuneboon, I can reproduce this! 🎉 My observations:

  • Please don't do that. :) HTTP clients reuse connections to amortize the cost of TCP and TLS handshakes. Creating one new client per request defeats that. I still would like to fix the memory usage if possible. The slower leak when you create a single instance is probably a totally different issue.
  • The required ingredients are 1/ AsyncElasticsearch 2/ SSL. The actual request does not matter (I reproduce with client.info()) and using a single node is also enough to reproduce. AsyncElasticsearch uses aiohttp, which has known memory leaks.
  • After about ~30 seconds, the memory usage stabilizes (see figure), which suggests it's maybe not an actual leak, but a reference cycle. However, running gc.collect() in the loop did not help.

leak

The next steps are using memray to understand the peak usage in more detail and trying to reproduce with aiohttp.

@pquentin
Copy link
Member

Here's my current attempt with aiohttp:

import asyncio
import aiohttp

async def leaky():
    i = 0
    while i <= 1500:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                "https://localhost:9200/",
                auth=aiohttp.BasicAuth("elastic", "changeme"),
                ssl=False,
            ) as response:
                assert response.status == 200
                await response.text()
        i += 1
        if i % 100 == 0:
            print(i)

if __name__ == "__main__":
    asyncio.run(leaky())

It inexplicably fails after 1000 connections with:

Traceback (most recent call last):
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/connector.py", line 1173, in _create_direct_connection
    hosts = await asyncio.shield(host_resolved)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/connector.py", line 884, in _resolve_host
    addrs = await self._resolver.resolve(host, port, family=self._family)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../.virtualenvs/elasticsearch-py/lib64/python3.12/site-packages/aiohttp/resolver.py", line 33, in resolve
    infos = await self._loop.getaddrinfo(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/asyncio/base_events.py", line 899, in getaddrinfo
    return await self.run_in_executor(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib64/python3.12/socket.py", line 963, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 16] Device or resource busy

And only partly reproduces the leak:

aiohttp

@pquentin
Copy link
Member

I just remembered that the upcoming release later this month will include HTTPX support, so I tried it too.

import asyncio
from elasticsearch import AsyncElasticsearch

async def leaky():
    i = 0
    while i <= 1500:
        async with AsyncElasticsearch(
            "https://localhost:9200",
            basic_auth=("elastic", "changeme"),
            verify_certs=False,
            node_class="httpxasync",
        ) as es:
            await es.info()
        i += 1
        if i % 100 == 0:
            print(i)

if __name__ == "__main__":
    asyncio.run(leaky())

httpx

There's still a leak, maybe? But it's smaller in terms of magnitude and has the same ceiling at some point.

@mortenb-buypass
Copy link

This is a server side issue, not actually elastic client, you need to limit the number of simultaneous connection like with httpx:

timeout = httpx.Timeout(10.0, connect=60.0)
limits = httpx.Limits(max_keepalive_connections=5, max_connections=15)
s = httpx.AsyncClient(http1=False, http2=True, timeout=timeout, limits=limits)

Great to see the linear graphs, it means there is no memory leak, the server is just caching and keeping the connections 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants