Skip to content

Commit

Permalink
introduce automated credential rotation for BDBA
Browse files Browse the repository at this point in the history
  • Loading branch information
TuanAnh17N committed Feb 3, 2025
1 parent eef0f6b commit 758dcfc
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
106 changes: 106 additions & 0 deletions cfg_mgmt/bdba.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import datetime
import logging

import requests

import cfg_mgmt
import model.bdba

logger = logging.getLogger(__name__)


def get_api_key_expiry(
cfg_element: model.bdba.BDBAConfig
) -> datetime.datetime:
credentials = cfg_element.credentials()
headers = {'Authorization': f'Bearer {credentials.token()}'}

response = requests.get(
f'{cfg_element.api_url()}/api/key/',
headers=headers,
verify=cfg_element.tls_verify(),
)

response.raise_for_status()

key_info = response.json()
expiry_timestamp = key_info['key'].get('expires')

if not expiry_timestamp:
raise ValueError('No expiration timestamp found for the API key.')

return datetime.datetime.fromisoformat(expiry_timestamp).replace(tzinfo=datetime.timezone.utc)


def rotate_cfg_element(
cfg_element: model.bdba.BDBAConfig,
cfg_factory: model.ConfigFactory,
) -> tuple[cfg_mgmt.revert_function, dict, model.bdba.BDBAConfig]:
logger.info(f'Rotating API key for {cfg_element.name()}')

credentials = cfg_element.credentials()
headers = {'Authorization': f'Bearer {credentials.token()}'}

# Request a new API key
response = requests.post(
f'{cfg_element.api_url()}/api/key/',
json={'validity': 15379200}, # 178 days
headers=headers,
verify=cfg_element.tls_verify(),
)
response.raise_for_status()

new_key_info = response.json()
logger.info(f'New API key response: {new_key_info}')

new_key = new_key_info['key']['value']

raw_cfg = cfg_element.raw.copy()
raw_cfg['credentials']['token'] = new_key

updated_cfg_element = model.bdba.BDBAConfig(
name=cfg_element.name(),
raw_dict=raw_cfg,
type_name=cfg_element._type_name
)

secret_id = {'api_key': new_key}

def no_op():
logger.warning('No rollback possible for BDBA key rotation.')

return no_op, secret_id, updated_cfg_element


def delete_config_secret(
cfg_element: model.bdba.BDBAConfig,
) -> model.bdba.BDBAConfig | None:
logger.info(f'Deleting API key for {cfg_element.name()}')

credentials = cfg_element.credentials()
headers = {'Authorization': f'Bearer {credentials.token}'}

response = requests.delete(
f'{cfg_element.api_url()}/api/key/',
headers=headers,
verify=cfg_element.tls_verify(),
)

if response.status_code == 400:
logger.warning(f'API key for {cfg_element.name()} was already deleted.')
return None

response.raise_for_status()

return None


def validate_for_rotation(cfg_element: model.bdba.BDBAConfig) -> None:
expiry_date = get_api_key_expiry(cfg_element)
remaining_days = (expiry_date - datetime.datetime.now(datetime.timezone.utc)).days

if remaining_days >= 10:
raise ValueError(f'API key for {cfg_element.name()} does not need rotation')

logger.warning((f'API key for {cfg_element.name()} expires in {remaining_days} days. '
'Proceeding with rotation.'))
8 changes: 8 additions & 0 deletions cfg_mgmt/rotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cfg_mgmt.aws as cmaws
import cfg_mgmt.alicloud as cmali
import cfg_mgmt.azure as cma
import cfg_mgmt.bdba as cmbd
import cfg_mgmt.btp_application_certificate as cmbac
import cfg_mgmt.btp_service_binding as cmb
import cfg_mgmt.gcp as cmg
Expand Down Expand Up @@ -64,6 +65,9 @@ def delete_expired_secret(
elif type_name == 'alicloud':
delete_func = cmali.delete_config_secret

elif type_name == 'bdba':
delete_func = cmbd.delete_config_secret

elif type_name == 'kubernetes':
try:
cmk.validate_for_rotation(cfg_element)
Expand Down Expand Up @@ -163,6 +167,10 @@ def rotate_cfg_element(
rotation_validation_function = cmk.validate_for_rotation
update_secret_function = cmk.rotate_cfg_element

elif type_name == 'bdba':
rotation_validation_function = cmbd.validate_for_rotation
update_secret_function = cmbd.rotate_cfg_element

if not update_secret_function:
logger.warning(f'{type_name=} is not (yet) supported for automated rotation')
return None
Expand Down
30 changes: 29 additions & 1 deletion cfg_mgmt/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import logging
import os
import time
import typing

import pytimeparse
Expand All @@ -18,6 +19,9 @@

logger = logging.getLogger(__name__)

MAX_RETRIES = 5
RETRY_DELAY = 60 # seconds


def iter_cfg_elements(
cfg_factory: typing.Union[model.ConfigFactory, model.ConfigurationSet],
Expand Down Expand Up @@ -297,7 +301,31 @@ def rotate_config_element_and_persist_in_cfg_repo(
git_helper.add_and_commit(
message=commit_message,
)
git_helper.push('@', target_ref)
for attempt in range(1, MAX_RETRIES + 1):
try:
git_helper.push('@', target_ref)
logger.info(f'Successfully pushed changes to GitHub on attempt {attempt}')
break
except Exception as e:
if attempt < MAX_RETRIES:
logger.warning(f'GitHub push failed (attempt {attempt}/{MAX_RETRIES}): {e}')
logger.info(f'Retrying in {RETRY_DELAY} seconds...')

# pull the latest changes and rebase
latest_commit = git_helper.fetch_head(target_ref)
git_helper.rebase(latest_commit.hexsha)

time.sleep(RETRY_DELAY)
else:
'''
BDBA API keys are immediately invalid once rotated.
If we lose the new key, we cannot recover the old one.
So we log it to allow manual intervention if needed
'''
if cfg_element._type_name == 'bdba':
logger.info(f'NEW BDBA API KEY: {secret_id.get('api_key')}')

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
This expression logs
sensitive data (secret)
as clear text.
raise RuntimeError(f'Failed to push changes to GitHub after {MAX_RETRIES} attempts')

except Exception as e:
logger.warning(f'failed to push updated secret - reverting. Error: {e}')
revert_function()
Expand Down

0 comments on commit 758dcfc

Please sign in to comment.