diff --git a/CHANGELOG.md b/CHANGELOG.md index 47e60b1..f056180 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) +## [3.4.2] - 2024-08-05 + +### Fixed +- Adding logic to check when the token is about to expire to re-connect. This fix cases for services that are running longer that token's ttl without restarting. Causing requests to get a Permission denied error. + + ## [3.4.1] - 2024-07-12 ### Fixed diff --git a/gestalt/vault.py b/gestalt/vault.py index f906109..653de2a 100644 --- a/gestalt/vault.py +++ b/gestalt/vault.py @@ -1,8 +1,6 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from queue import Queue -from threading import Thread -from time import sleep from typing import Any, Dict, List, Optional, Tuple, Union import hvac # type: ignore @@ -12,6 +10,9 @@ from retry.api import retry_call from gestalt.provider import Provider +from dateutil.parser import isoparse + +EXPIRATION_THRESHOLD_HOURS = 1 class Vault(Provider): @@ -39,7 +40,7 @@ def __init__( self._scheme: str = scheme self._run_worker = True self.dynamic_token_queue: Queue[Tuple[str, str, str]] = Queue() - self.kubes_token_queue: Queue[Tuple[str, str, str]] = Queue() + self.kubes_token: Optional[Tuple[str, str, str, datetime]] = None self._vault_client: Optional[hvac.Client] = None self._secret_expiry_times: Dict[str, datetime] = dict() @@ -91,31 +92,20 @@ def connect(self) -> None: ) if token is not None: + print("Kubernetes login successful") kubes_token = ( "kubernetes", token["data"]["id"], token["data"]["ttl"], + token["data"]['expire_time'], ) - self.kubes_token_queue.put(kubes_token) + self.kubes_token = kubes_token except hvac.exceptions.InvalidPath: raise RuntimeError( "Gestalt Error: Kubernetes auth couldn't be performed") except requests.exceptions.ConnectionError: raise RuntimeError("Gestalt Error: Couldn't connect to Vault") - dynamic_ttl_renew = Thread( - name="dynamic-token-renew", - target=self.worker, - daemon=True, - args=(self.dynamic_token_queue, ), - ) # noqa: F841 - kubernetes_ttl_renew = Thread( - name="kubes-token-renew", - target=self.worker, - daemon=True, - args=(self.kubes_token_queue, ), - ) - kubernetes_ttl_renew.start() self._is_connected = True def stop(self) -> None: @@ -151,6 +141,9 @@ def get( key): return self._secret_values[key] + # verify if the token still valid, in case not, call connect() + self._validate_token_expiration() + try: response = retry_call( self.vault_client.read, @@ -213,33 +206,32 @@ def _set_secrets_ttl(self, requested_data: Dict[str, Any], secret_expires_dt = last_vault_rotation_dt + timedelta(seconds=ttl) self._secret_expiry_times[key] = secret_expires_dt - def worker(self, token_queue: Queue) -> None: # type: ignore - """ - Worker function to renew lease on expiry - """ - try: - while self._run_worker: - if not token_queue.empty(): - token_type, token_id, token_duration = token = token_queue.get( - ) - if token_type == "kubernetes": - self.vault_client.auth.token.renew(token_id) - print("kubernetes token for the app has been renewed") - elif token_type == "dynamic": - self.vault_client.sys.renew_lease(token_id) - print("dynamic token for the app has been renewed") - token_queue.task_done() - token_queue.put_nowait(token) - sleep((token_duration / 3) * 2) - except hvac.exceptions.InvalidPath: - raise RuntimeError( - "Gestalt Error: The lease path or mount is set incorrectly") - except requests.exceptions.ConnectionError: - raise RuntimeError( - "Gestalt Error: Gestalt couldn't connect to Vault") - except Exception as err: - raise RuntimeError(f"Gestalt Error: {err}") - @property def scheme(self) -> str: return self._scheme + + def _validate_token_expiration(self) -> None: + if self.kubes_token is not None: + expire_time = self.kubes_token[3] + # Use isoparse to correctly parse the datetime string + expire_time = isoparse(expire_time) + + # Ensure the parsed time is in UTC + if expire_time.tzinfo is None: + expire_time = expire_time.replace(tzinfo=timezone.utc) + else: + expire_time = expire_time.astimezone(timezone.utc) + + current_time = datetime.now(timezone.utc) + # in hours + delta_time = (expire_time - current_time).total_seconds() / 3600 + + if delta_time < EXPIRATION_THRESHOLD_HOURS: + print(f"Re-authenticating with vault") + self.connect() + else: + print(f"Token still valid for: {delta_time} hours") + else: + print( + f"Can't reconnect, token information: {self.kubes_token}, not valid" + ) diff --git a/requirements.test.txt b/requirements.test.txt index 8f6a296..3f71f88 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -12,3 +12,4 @@ retry==0.9.2 types-retry==0.9.9 jsonpath-ng==1.5.3 pytest-asyncio==0.19.0 +python-dateutil>=2.8.0 diff --git a/requirements.txt b/requirements.txt index be8829c..9c7f928 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ hvac>=1.0.2,<1.1.0 jsonpath-ng==1.5.3 retry==0.9.2 types-retry==0.9.9 +python-dateutil>=2.8.0 +types-python-dateutil>=0.1.0 diff --git a/setup.py b/setup.py index b7a74d3..a998403 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ def readme(): setup( name="gestalt-cfg", - version="3.4.1", + version="3.4.2", description="A sensible configuration library for Python", long_description=readme(), long_description_content_type="text/markdown", diff --git a/tests/conftest.py b/tests/conftest.py index b5094bc..533f349 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,6 +18,7 @@ def request(self, *_, **__): "rotation_period": 60, "ttl": 0, "username": "foo", + "expire_time": "2024-08-15T22:04:49.82981496Z" }, "wrap_info": None, "warnings": None, @@ -79,9 +80,7 @@ def nested_setup(): def mock_vault_workers(): mock_dynamic_renew = Mock() mock_k8s_renew = Mock() - with patch("gestalt.vault.Thread", - side_effect=[mock_dynamic_renew, mock_k8s_renew]): - yield (mock_dynamic_renew, mock_k8s_renew) + return (mock_dynamic_renew, mock_k8s_renew) @pytest.fixture diff --git a/tests/test_gestalt.py b/tests/test_gestalt.py index b398e58..00ba55c 100644 --- a/tests/test_gestalt.py +++ b/tests/test_gestalt.py @@ -557,79 +557,23 @@ def test_set_vault_key(nested_setup): assert secret == "ref+vault://secret/data/testnested#.slack.token" -def test_vault_lazy_connect(mock_vault_workers, mock_vault_k8s_auth): +def test_vault_lazy_connect(mock_vault_k8s_auth): with patch("gestalt.vault.hvac.Client") as mock_client: v = Vault(role="test-role", jwt="test-jwt") + v.vault_client.auth.token.lookup_self = Mock( + return_value={ + "data": { + "id": "foo", + "ttl": "foo", + "expire_time": "2024-08-15T22:04:49.82981496Z" + } + }) assert not v._is_connected v.get("foo", "foo", ".foo") assert v._is_connected mock_client().auth.token.lookup_self.assert_called() -def test_vault_worker_dynamic(mock_vault_workers, mock_vault_k8s_auth): - mock_dynamic_renew, mock_k8s_renew = mock_vault_workers - - mock_sleep = None - - def except_once(self, **kwargs): - # side effect used to exit the worker loop after one call - if mock_sleep.call_count == 1: - raise hvac.exceptions.VaultError("some error") - - with patch("gestalt.vault.sleep", side_effect=except_once, - autospec=True) as mock_sleep: - with patch("gestalt.vault.hvac.Client") as mock_client: - v = Vault(role="test-role", jwt="test-jwt") - v.connect() - - mock_k8s_renew.start.assert_called() - - test_token_queue = Queue(maxsize=0) - test_token_queue.put(("dynamic", 1, 100)) - - with pytest.raises(RuntimeError): - v.worker(test_token_queue) - - mock_sleep.assert_called() - mock_client().sys.renew_lease.assert_called() - mock_k8s_renew.start.assert_called_once() - - mock_dynamic_renew.stop() - mock_k8s_renew.stop() - - -def test_vault_worker_k8s(mock_vault_workers): - mock_dynamic_renew, mock_k8s_renew = mock_vault_workers - - mock_sleep = None - - def except_once(self, **kwargs): - # side effect used to exit the worker loop after one call - if mock_sleep.call_count == 1: - raise hvac.exceptions.VaultError("some error") - - with patch("gestalt.vault.sleep", side_effect=except_once, - autospec=True) as mock_sleep: - with patch("gestalt.vault.hvac.Client") as mock_client: - v = Vault(role="test-role", jwt="test-jwt") - v.connect() - - mock_k8s_renew.start.assert_called() - - test_token_queue = Queue(maxsize=0) - test_token_queue.put(("kubernetes", 1, 100)) - - with pytest.raises(RuntimeError): - v.worker(test_token_queue) - - mock_sleep.assert_called() - mock_client().auth.token.renew.assert_called() - mock_k8s_renew.start.assert_called_once() - - mock_dynamic_renew.stop() - mock_k8s_renew.stop() - - def test_vault_start_dynamic_lease(mock_vault_workers): mock_response = { "lease_id": "1", @@ -643,11 +587,11 @@ def test_vault_start_dynamic_lease(mock_vault_workers): return_value=mock_response) with mock_vault_client_patch as mock_vault_client_read: mock_dynamic_token_queue = Mock() - mock_kube_token_queue = Mock() + mock_kube_token = ("kubernetes", "hvs.CAESIEkz-UO8yvfC8v", "2764799") with patch( "gestalt.vault.Queue", - side_effect=[mock_dynamic_token_queue, mock_kube_token_queue], - ) as mock_queues: + side_effect=[mock_dynamic_token_queue], + ) as mock_queue: v = Vault(role=None, jwt=None) g = gestalt.Gestalt() g.add_config_file("./tests/testvault/testmount.json") @@ -657,9 +601,9 @@ def test_vault_start_dynamic_lease(mock_vault_workers): mock_vault_client_read.assert_called() mock_dynamic_token_queue.put_nowait.assert_called() + assert mock_kube_token == ("kubernetes", "hvs.CAESIEkz-UO8yvfC8v", + "2764799") mock_vault_client_read.stop() mock_dynamic_token_queue.stop() - mock_kube_token_queue.stop() - mock_queues.stop() - mock_vault_client_read.stop() + mock_queue.stop()