Skip to content

Commit

Permalink
Copy pieces of operational DB to archive DB daily
Browse files Browse the repository at this point in the history
Add a Cloud Scheduler job posting messages to a Pub/Sub topic every day
at 12:00, which triggers a Cloud Function, which copies one week of
(missing) data from the operational DB (PostgreSQL) to archive DB
(BigQuery). This replaces the previously-used writing through a mux with
rate limiting.
  • Loading branch information
spbnick committed Nov 6, 2024
1 parent 5dc7a81 commit 185c939
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 1 deletion.
8 changes: 7 additions & 1 deletion cloud
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ function execute_command() {
declare -r cache_bucket_name="${project}_${prefix}cache"
declare -r pick_notifications_trigger_topic="${prefix}pick_notifications_trigger"
declare -r purge_db_trigger_topic="${prefix}purge_db_trigger"
declare -r archive_trigger_topic="${prefix}archive_trigger"
declare -r cache_redirect_function_name="cache_redirect"
declare cache_redirector_url="https://${FUNCTION_REGION}"
declare cache_redirector_url+="-${project}.cloudfunctions.net/"
Expand Down Expand Up @@ -301,6 +302,7 @@ function execute_command() {
--updated-topic="$updated_topic"
--load-queue-trigger-topic="$load_queue_trigger_topic"
--purge-db-trigger-topic="$purge_db_trigger_topic"
--archive-trigger-topic="$archive_trigger_topic"
--updated-urls-topic="$updated_urls_topic"
--spool-collection-path="$spool_collection_path"
--extra-cc="$extra_cc"
Expand Down Expand Up @@ -394,6 +396,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--smtp-topic="$smtp_topic" \
--smtp-subscription="$smtp_subscription" \
--cost-topic="$cost_topic" \
Expand All @@ -411,6 +414,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--updated-urls-topic="$updated_urls_topic" \
--updated-topic="$updated_topic" \
--cache-redirect-function-name="$cache_redirect_function_name" \
Expand All @@ -422,7 +426,8 @@ function execute_command() {
--load-queue-trigger-topic="$load_queue_trigger_topic" \
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic="$purge_db_trigger_topic"
--purge-db-trigger-topic="$purge_db_trigger_topic" \
--archive-trigger-topic="$archive_trigger_topic"
sections_run "$sections" submitters_deploy \
"$project" "$new_topic" "${submitters[@]}"
# Handle "shutdown" command
Expand Down Expand Up @@ -456,6 +461,7 @@ function execute_command() {
--pick-notifications-trigger-topic \
"$pick_notifications_trigger_topic" \
--purge-db-trigger-topic "$purge_db_trigger_topic" \
--archive-trigger-topic "$archive_trigger_topic" \
--new-topic="$new_topic" \
--new-load-subscription="$new_load_subscription" \
--new-debug-subscription="$new_debug_subscription" \
Expand Down
16 changes: 16 additions & 0 deletions kcidb/cloud/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ declare _FUNCTIONS_SH=
# --updated-topic=NAME
# --load-queue-trigger-topic=NAME
# --purge-db-trigger-topic=NAME
# --archive-trigger-topic=NAME
# --updated-urls-topic=NAME
# --cache-bucket-name=NAME
# --cache-redirector-url=URL
Expand All @@ -43,6 +44,7 @@ function functions_env() {
updated_publish updated_topic \
load_queue_trigger_topic \
purge_db_trigger_topic \
archive_trigger_topic \
updated_urls_topic \
spool_collection_path \
extra_cc \
Expand Down Expand Up @@ -78,6 +80,7 @@ function functions_env() {
[KCIDB_UPDATED_QUEUE_TOPIC]="$updated_topic"
[KCIDB_LOAD_QUEUE_TRIGGER_TOPIC]="$load_queue_trigger_topic"
[KCIDB_PURGE_DB_TRIGGER_TOPIC]="$purge_db_trigger_topic"
[KCIDB_ARCHIVE_TRIGGER_TOPIC]="$archive_trigger_topic"
[KCIDB_UPDATED_URLS_TOPIC]="$updated_urls_topic"
[KCIDB_SELECTED_SUBSCRIPTIONS]=""
[KCIDB_SPOOL_COLLECTION_PATH]="$spool_collection_path"
Expand Down Expand Up @@ -137,6 +140,7 @@ function functions_env() {
# --spool-collection-path=PATH
# --cache-redirect-function-name=NAME
# --env-yaml=YAML
# --archive-trigger-topic=NAME
function functions_deploy() {
declare params
params="$(getopt_vars sections project prefix source \
Expand All @@ -148,6 +152,7 @@ function functions_deploy() {
spool_collection_path \
cache_redirect_function_name \
env_yaml \
archive_trigger_topic \
-- "$@")"
eval "$params"

Expand All @@ -171,6 +176,15 @@ function functions_deploy() {
trigger_event+="document.create"
declare trigger_resource="projects/$project/databases/(default)/documents/"
trigger_resource+="${spool_collection_path}/{notification_id}"

function_deploy "$sections" "$source" "$project" "$prefix" \
archive true \
--env-vars-file "$env_yaml_file" \
--trigger-topic "${archive_trigger_topic}" \
--memory 2048MB \
--max-instances=1 \
--timeout 540

function_deploy "$sections" "$source" "$project" "$prefix" \
purge_db true \
--env-vars-file "$env_yaml_file" \
Expand Down Expand Up @@ -243,6 +257,8 @@ function _functions_withdraw_or_shutdown() {
cache_redirect_function_name \
-- "$@")"
eval "$params"
"function_$action" "$sections" "$project" "$prefix" \
archive
"function_$action" "$sections" "$project" "$prefix" \
purge_db
"function_$action" "$sections" "$project" "$prefix" \
Expand Down
6 changes: 6 additions & 0 deletions kcidb/cloud/pubsub.sh
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ function pubsub_subscription_withdraw() {
# --cost-upd-service-account=NAME
# --cost-mon-service=NAME
# --iss-ed-service=NAME
# --archive-trigger-topic=NAME
function pubsub_deploy() {
declare params
params="$(getopt_vars project \
Expand All @@ -160,6 +161,7 @@ function pubsub_deploy() {
cost_upd_service_account \
cost_mon_service \
iss_ed_service \
archive_trigger_topic \
-- "$@")"
eval "$params"
declare project_number
Expand Down Expand Up @@ -198,6 +200,7 @@ function pubsub_deploy() {
--message-retention-duration=12h
pubsub_topic_deploy "$project" "${pick_notifications_trigger_topic}"
pubsub_topic_deploy "$project" "${purge_db_trigger_topic}"
pubsub_topic_deploy "$project" "${archive_trigger_topic}"
pubsub_topic_deploy "$project" "${updated_urls_topic}"
if [ -n "$smtp_topic" ]; then
pubsub_topic_deploy "$project" "$smtp_topic"
Expand Down Expand Up @@ -242,6 +245,7 @@ function pubsub_deploy() {
# --smtp-subscription=NAME
# --cost-topic=NAME
# --cost-upd-service-account=NAME
# --archive-trigger-topic=NAME
function pubsub_withdraw() {
declare params
params="$(getopt_vars project \
Expand All @@ -257,6 +261,7 @@ function pubsub_withdraw() {
smtp_topic smtp_subscription \
cost_topic \
cost_upd_service_account \
archive_trigger_topic \
-- "$@")"
eval "$params"
declare project_number
Expand All @@ -275,6 +280,7 @@ function pubsub_withdraw() {
pubsub_subscription_withdraw "$project" "$new_debug_subscription"
pubsub_subscription_withdraw "$project" "$new_load_subscription"
pubsub_topic_withdraw "$project" "$new_topic"
pubsub_topic_withdraw "$project" "$archive_trigger_topic"
pubsub_topic_withdraw "$project" "$load_queue_trigger_topic"
pubsub_topic_withdraw "$project" "$pick_notifications_trigger_topic"
pubsub_topic_withdraw "$project" "$updated_urls_topic"
Expand Down
7 changes: 7 additions & 0 deletions kcidb/cloud/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ function scheduler_job_withdraw() {
# --load-queue-trigger-topic=NAME
# --pick-notifications-trigger-topic=NAME
# --purge-db-trigger-topic=NAME
# --archive-trigger-topic=NAME
function scheduler_deploy() {
declare params
params="$(getopt_vars project \
prefix \
load_queue_trigger_topic \
pick_notifications_trigger_topic \
purge_db_trigger_topic \
archive_trigger_topic \
-- "$@")"
eval "$params"
# Deploy the jobs
Expand All @@ -90,6 +92,10 @@ function scheduler_deploy() {
"$project" "${prefix}purge_sm_db_trigger" \
"$purge_db_trigger_topic" '0 7 * * *' \
'{"database": "sm", "timedelta": {"delta": {"days": 30}}}'
scheduler_job_pubsub_deploy \
"$project" "${prefix}archive_trigger" \
"$archive_trigger_topic" '0 12 * * *' \
'{}'
}

# Withdraw from the scheduler
Expand All @@ -101,6 +107,7 @@ function scheduler_withdraw() {
scheduler_job_withdraw "$project" "${prefix}pick_notifications_trigger"
scheduler_job_withdraw "$project" "${prefix}purge_op_db_trigger"
scheduler_job_withdraw "$project" "${prefix}purge_sm_db_trigger"
scheduler_job_withdraw "$project" "${prefix}archive_trigger"
}

fi # _SCHEDULER_SH
1 change: 1 addition & 0 deletions kcidb/cloud/sections.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ declare -A -r SECTIONS=(
["firestore"]="Firestore database"
["storage"]="Google cloud storage"
["functions.purge_db"]="Cloud Functions: kcidb_purge_db()"
["functions.archive"]="Cloud Functions: kcidb_archive()"
["functions.pick_notifications"]="Cloud Functions: kcidb_pick_notifications()"
["functions.send_notification"]="Cloud Functions: kcidb_send_notification()"
["functions.spool_notifications"]="Cloud Functions: kcidb_spool_notifications()"
Expand Down
49 changes: 49 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,55 @@ def kcidb_pick_notifications(data, context):
spool_client.ack(notification_id)


def kcidb_archive(event, context):
"""
Transfer data from the operational database into the archive database,
that is out of the editing window (to be enforced), and hasn't been
transferred yet.
"""
op_client = get_db_client(OPERATIONAL_DATABASE)
op_now = op_client.get_current_time()
op_first_modified = op_client.get_first_modified()
if not op_first_modified:
LOGGER.info("Operational database is empty, nothing to archive")
return

ar_client = get_db_client(ARCHIVE_DATABASE)
ar_last_modified = ar_client.get_last_modified()

after = ar_last_modified or \
(op_first_modified - datetime.timedelta(seconds=1))
until = min(
# Add a timespan we can fit in memory and exec time limits
after + datetime.timedelta(days=7),
# Subtract editing window (to be enforced)
op_now - datetime.timedelta(days=14)
)
if until <= after:
LOGGER.info("No data old enough to archive")
return

after_str = after.isoformat(timespec='microseconds')
until_str = until.isoformat(timespec='microseconds')

# TODO: Transfer data in multiple smaller pieces

# Fetch the data from operational database
# Preserve timestamps!
LOGGER.info("FETCHING operational database dump for (%s, %s] range",
after_str, until_str)
dump = op_client.dump(with_metadata=True, after=after, until=until)

# Load data into archive database
# Preserve timestamps!
LOGGER.info("LOADING a dump of %u objects into archive database",
kcidb.io.SCHEMA.count(dump))
ar_client.load(dump, with_metadata=True)

LOGGER.info("ARCHIVED %u objects in (%s, %s] range",
kcidb.io.SCHEMA.count(dump), after_str, until_str)


def kcidb_purge_db(event, context):
"""
Purge data from the operational database, older than the optional delta
Expand Down
86 changes: 86 additions & 0 deletions test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,89 @@ def filter_test_data(data):
assert dump == client.get_schema()[1].upgrade(
data_after if purging else data
)


def test_archive(empty_deployment):
"""Check kcidb_archive() works correctly"""
# Make empty_deployment appear used to silence pylint warning
assert empty_deployment is None

op_client = kcidb.db.Client(os.environ["KCIDB_OPERATIONAL_DATABASE"])
op_schema = op_client.get_schema()[1]
ar_client = kcidb.db.Client(os.environ["KCIDB_ARCHIVE_DATABASE"])
ar_schema = ar_client.get_schema()[1]
publisher = kcidb.mq.JSONPublisher(
os.environ["GCP_PROJECT"],
os.environ["KCIDB_ARCHIVE_TRIGGER_TOPIC"]
)

# Empty the archive
ar_client.empty()

# Generate timestamps
ts_now = op_client.get_current_time()
ts_3w = ts_now - timedelta(days=7 * 3)
ts_4w = ts_now - timedelta(days=7 * 4)

def gen_data(id, ts):
"""
Generate a dataset with one object per type, all using the specified
timestamp, ID, and origin extracted from the ID.
"""
assert isinstance(id, str)
assert isinstance(ts, datetime) and ts.tzinfo
origin = id.split(":")[0]
assert origin
assert origin != id
base = dict(id=id, origin=origin,
_timestamp=ts.isoformat(timespec='microseconds'))
return dict(
checkouts=[base | dict()],
builds=[base | dict(checkout_id=id)],
tests=[base | dict(build_id=id)],
issues=[base | dict(version=1)],
incidents=[base | dict(issue_id=id, issue_version=1)],
**op_schema.new(),
)

# Generate datasets
data_now = gen_data("archive:now", ts_now)
data_3w = gen_data("archive:3w", ts_3w)
data_4w = gen_data("archive:4w", ts_4w)

# Load data_now into the operational DB
op_client.load(data_now, with_metadata=True)
# Trigger and wait for archival (ignore possibility of actual trigger)
publisher.publish({})
time.sleep(30)
# Check data_now doesn't end up in the archive DB
assert ar_schema.count(ar_client.dump()) == 0

# Load data_3w and data_4w
op_client.load(op_schema.merge(data_3w, [data_4w]), with_metadata=True)
# Trigger and wait for archival (ignore possibility of actual trigger)
publisher.publish({})
time.sleep(30)
# Check data_4w is in the archive database
dump = ar_client.dump()
assert all(
any(obj["id"] == "archive:4w"
for obj in dump.get(obj_list_name, []))
for obj_list_name in op_schema.id_fields
), "No complete four-week old data in the archive"
# Check data_3w is not in the archive database
assert not any(
any(obj["id"] == "archive:3w"
for obj in dump.get(obj_list_name, []))
for obj_list_name in op_schema.id_fields
), "Some three-week old data in the archive"
# Trigger and wait for another archival (ignore chance of actual trigger)
publisher.publish({})
time.sleep(30)
# Check data_3w is now in the archive database
dump = ar_client.dump()
assert all(
any(obj["id"] == "archive:3w"
for obj in dump.get(obj_list_name, []))
for obj_list_name in op_schema.id_fields
), "No complete three-week old data in the archive"

0 comments on commit 185c939

Please sign in to comment.