Skip to content

Commit

Permalink
Merge pull request #35 from clear-street/awheelock/sc-61051/gestalt-r…
Browse files Browse the repository at this point in the history
…esolve-warning-coroutine-is-not

Remove async in vault provider
  • Loading branch information
adisunw authored Jul 6, 2023
2 parents e10bfcb + 19de796 commit 22a1385
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
# Label used to access the service container
vault:
# Docker Hub image
image: vault
image: hashicorp/vault:1.13.3
options: --name vault-test
ports:
# Opens tcp port 8200 on the host and service container
Expand Down
2 changes: 1 addition & 1 deletion gestalt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import glob

from typing import Dict, List, Type, Union, Optional, MutableMapping, Text, Any
from typing import Dict, List, Type, Union, Optional, Text, Any
import yaml
import re
import json
Expand Down
25 changes: 12 additions & 13 deletions gestalt/vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from jsonpath_ng import parse # type: ignore
from typing import Optional, Tuple, Any, Dict, Union, List
import hvac # type: ignore
import asyncio
from queue import Queue
import os
from threading import Thread
from retry import retry
Expand All @@ -31,8 +31,8 @@ def __init__(self,
with role and jwt string from kubernetes
"""
self._scheme: str = scheme
self.dynamic_token_queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=0)
self.kubes_token_queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=0)
self.dynamic_token_queue: Queue[Tuple[str, str, str]] = Queue()
self.kubes_token_queue: Queue[Tuple[str, str, str]] = Queue()

self.vault_client = hvac.Client(url=url,
token=token,
Expand Down Expand Up @@ -66,16 +66,15 @@ def __init__(self,
except requests.exceptions.ConnectionError:
raise RuntimeError("Gestalt Error: Couldn't connect to Vault")

dynamic_ttl_renew = Thread(name='dynamic-token-renew',
target=asyncio.run,
daemon=True,
args=(self.worker(
self.dynamic_token_queue), ))
dynamic_ttl_renew = Thread(
name='dynamic-token-renew',
target=self.worker,
daemon=True,
args=(self.dynamic_token_queue, )) # noqa: F841
kubernetes_ttl_renew = Thread(name="kubes-token-renew",
target=asyncio.run,
target=self.worker,
daemon=True,
args=(self.worker(
self.kubes_token_queue), ))
args=(self.kubes_token_queue, ))
kubernetes_ttl_renew.start()

@retry(RuntimeError, delay=3, tries=3) # type: ignore
Expand Down Expand Up @@ -154,15 +153,15 @@ def _set_secrets_ttl(self, requested_data: Dict[str, Any],
secret_expires_dt = last_vault_rotation_dt + timedelta(seconds=ttl)
self._secret_expiry_times[key] = secret_expires_dt

async def worker(self, token_queue: Any) -> None:
def worker(self, token_queue: Queue) -> None: # type: ignore
"""
Worker function to renew lease on expiry
"""

try:
while True:
if not token_queue.empty():
token_type, token_id, token_duration = token = await token_queue.get(
token_type, token_id, token_duration = token = token_queue.get(
)
if token_type == "kubernetes":
self.vault_client.auth.token.renew(token_id)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def readme():
reqs_list = list(map(lambda x: x.rstrip(), reqs))

setup(name='gestalt-cfg',
version='3.3.0',
version='3.3.1',
description='A sensible configuration library for Python',
long_description=readme(),
long_description_content_type="text/markdown",
Expand Down
25 changes: 11 additions & 14 deletions tests/test_gestalt.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

from gestalt.vault import Vault
from gestalt import merge_into
import asyncio
import pytest
import os
import gestalt
import hvac
from queue import Queue


# Testing member function
Expand Down Expand Up @@ -541,8 +541,7 @@ def test_set_vault_key(nested_setup):
assert secret == "ref+vault://secret/data/testnested#.slack.token"


@pytest.mark.asyncio
async def test_vault_worker_dynamic(mock_vault_workers, mock_vault_k8s_auth):
def test_vault_worker_dynamic(mock_vault_workers, mock_vault_k8s_auth):
mock_dynamic_renew, mock_k8s_renew = mock_vault_workers

mock_sleep = None
Expand All @@ -560,11 +559,11 @@ def except_once(self, **kwargs):

mock_k8s_renew.start.assert_called()

test_token_queue = asyncio.Queue(maxsize=0)
await test_token_queue.put(("dynamic", 1, 100))
test_token_queue = Queue(maxsize=0)
test_token_queue.put(("dynamic", 1, 100))

with pytest.raises(RuntimeError):
await v.worker(test_token_queue)
v.worker(test_token_queue)

mock_sleep.assert_called()
mock_client().sys.renew_lease.assert_called()
Expand All @@ -574,8 +573,7 @@ def except_once(self, **kwargs):
mock_k8s_renew.stop()


@pytest.mark.asyncio
async def test_vault_worker_k8s(mock_vault_workers):
def test_vault_worker_k8s(mock_vault_workers):
mock_dynamic_renew, mock_k8s_renew = mock_vault_workers

mock_sleep = None
Expand All @@ -592,11 +590,11 @@ def except_once(self, **kwargs):

mock_k8s_renew.start.assert_called()

test_token_queue = asyncio.Queue(maxsize=0)
await test_token_queue.put(("kubernetes", 1, 100))
test_token_queue = Queue(maxsize=0)
test_token_queue.put(("kubernetes", 1, 100))

with pytest.raises(RuntimeError):
await v.worker(test_token_queue)
v.worker(test_token_queue)

mock_sleep.assert_called()
mock_client().auth.token.renew.assert_called()
Expand All @@ -606,8 +604,7 @@ def except_once(self, **kwargs):
mock_k8s_renew.stop()


@pytest.mark.asyncio
async def test_vault_start_dynamic_lease(mock_vault_workers):
def test_vault_start_dynamic_lease(mock_vault_workers):
mock_response = {
"lease_id": "1",
"lease_duration": 5,
Expand All @@ -622,7 +619,7 @@ async def test_vault_start_dynamic_lease(mock_vault_workers):
mock_dynamic_token_queue = Mock()
mock_kube_token_queue = Mock()
with patch(
"gestalt.vault.asyncio.Queue",
"gestalt.vault.Queue",
side_effect=[mock_dynamic_token_queue,
mock_kube_token_queue]) as mock_queues:

Expand Down

0 comments on commit 22a1385

Please sign in to comment.