Skip to content

Commit

Permalink
Add first draft of standby_cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
phvalguima committed Oct 23, 2023
1 parent da3c5d1 commit b6251b4
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 2 deletions.
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ set-tls-private-key:
private-key:
type: string
description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified.

promote-standby-cluster:
description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster.

demote-primary-cluster:
description: Demotes the primary cluster of choice to a standby. Must be ran against the charm unit leader of the standby cluster.
5 changes: 5 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
database:
interface: postgresql_client
db:
Expand All @@ -51,6 +53,9 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
limit: 1
certificates:
interface: tls-certificates
limit: 1
Expand Down
2 changes: 2 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(self, *args):
postgresql_db_port = ServicePort(5432, name="database")
patroni_api_port = ServicePort(8008, name="api")
self.service_patcher = KubernetesServicePatch(self, [postgresql_db_port, patroni_api_port])
self.async_manager = PostgreSQLAsyncReplication(self)

def _generate_metrics_jobs(self, enable_tls: bool) -> Dict:
"""Generate spec for Prometheus scraping."""
Expand Down
15 changes: 13 additions & 2 deletions src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ def render_patroni_yml_file(
# Open the template patroni.yml file.
with open("templates/patroni.yml.j2", "r") as file:
template = Template(file.read())

primary = self._charm.async_manager.get_primary_data()

# Render the template file with the correct values.
rendered = template.render(
connectivity=connectivity,
Expand All @@ -343,8 +346,12 @@ def render_patroni_yml_file(
is_no_sync_member=is_no_sync_member,
namespace=self._namespace,
storage_path=self._storage_path,
superuser_password=self._superuser_password,
replication_password=self._replication_password,
superuser_password=primary["superuser-password"]
if primary
else self._superuser_password,
replication_password=primary["replication-password"]
if primary
else self._replication_password,
rewind_user=REWIND_USER,
rewind_password=self._rewind_password,
enable_pgbackrest=stanza is not None,
Expand All @@ -355,6 +362,10 @@ def render_patroni_yml_file(
minority_count=self._members_count // 2,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
standby_cluster_endpoint=primary["endpoint"] if primary else None,
extra_replication_endpoints={"{}/32".format(primary["endpoint"])}
if primary
else self._charm.async_manager.standby_endpoints(),
)
self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644)

Expand Down
219 changes: 219 additions & 0 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.

"""Implements the state-machine.
1) First async replication relation is made: both units get blocked waiting for a leader
2) User runs the promote action against one of the clusters
3) The cluster moves leader and sets the async-replication data, marking itself as leader
4) The other units receive that new information and update themselves to become standby-leaders.
"""

import json
import logging
from typing import Dict, Set

from lightkube import Client
from lightkube.resources.core_v1 import Endpoints
from ops.charm import (
ActionEvent,
CharmBase,
)
from ops.framework import Object
from ops.model import (
Unit,
)

logger = logging.getLogger(__name__)


ASYNC_PRIMARY_RELATION = "async-primary"
ASYNC_REPLICA_RELATION = "async-replica"


class MoreThanOnePrimarySelectedError(Exception):
"""Represents more than one primary has been selected."""


class PostgreSQLAsyncReplication(Object):
"""Defines the async-replication management logic."""

def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION) -> None:
super().__init__(charm, relation_name)
self.relation_name = relation_name
self.charm = charm
self.framework.observe(
self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed
)
self.framework.observe(
self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_primary_changed
)
self.framework.observe(
self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster
)
self.framework.observe(
self.charm.on.demote_primary_cluster_action, self._on_demote_primary_cluster
)

# We treat both relations above as actually the same.
# The big difference appears only at promote/demote actions
self.relation_set = {
*set(self.charm.model.relations[ASYNC_PRIMARY_RELATION]),
*set(self.charm.model.relations[ASYNC_REPLICA_RELATION]),
}
self.container = self.charm.unit.get_container("postgresql")

@property
def endpoint(self) -> str:
"""Assumes the endpoint is the same, disregard if we are a primary or standby cluster."""
for rel in self.relation_set:
return str(self.charm.model.get_binding(rel).network.ingress_address)
return None

def standby_endpoints(self) -> Set[str]:
"""Returns the set of IPs used by each standby unit with a /32 mask."""
standby_endpoints = set()
for rel in self.relation_set:
for unit in rel.units:
if not rel.data[unit].get("elected", None):
standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"])))
return standby_endpoints

def get_primary_data(self) -> Dict[str, str]:
"""Returns the primary info, if available."""
for rel in self.relation_set:
for unit in rel.units:
if unit.name == self.charm.unit.name:
# If this unit is the leader, then return None
return None
if rel.data[unit].get("elected", None):
elected_data = json.loads(rel.data[unit]["elected"])
return {
"endpoint": str(elected_data["endpoint"]),
"replication-password": elected_data["replication-password"],
"superuser-password": elected_data["superuser-password"],
}
return None

def _on_primary_changed(self, _):
"""Triggers a configuration change."""
primary = self._check_if_primary_already_selected()
if not primary:
return

if primary.name == self.charm.unit.name:
# This unit is the leader, generate a new configuration and leave.
# There is nothing to do for the leader.
self.charm.update_config()
self.container.start(self.charm._postgresql_service)
return

self.container.stop(self.charm._postgresql_service)

# Standby units must delete their data folder
# Delete the K8S endpoints that tracks the cluster information, including its id.
# This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't
# work after the database service is stopped on Pebble.
try:
client = Client()
client.delete(
Endpoints,
name=f"patroni-{self.charm._name}",
namespace=self.charm._namespace,
)
client.delete(
Endpoints,
name=f"patroni-{self.charm._name}-config",
namespace=self.charm._namespace,
)

self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output()
self.charm._create_pgdata(self.container)

self.charm.update_config()
except Exception:
pass
self.container.start(self.charm._postgresql_service)

def _get_primary_candidates(self):
rel = self.model.get_relation(ASYNC_PRIMARY_RELATION)
return rel.units if rel else []

def _check_if_primary_already_selected(self) -> Unit:
"""Returns the unit if a primary is present."""
result = None
if not self.relation_set:
return None
for rel in self.relation_set:
for unit in rel.units:
if "elected" in rel.data[unit] and not result:
result = unit
elif result:
raise MoreThanOnePrimarySelectedError
return result

def _on_promote_standby_cluster(self, event: ActionEvent) -> None:
"""Moves a standby cluster to a primary, if none is present."""
if (
"cluster_initialised" not in self.charm._peers.data[self.charm.app]
or not self.charm._patroni.member_started
):
event.fail("Cluster not initialized yet.")
return

if not self.charm.unit.is_leader():
event.fail("Not the charm leader unit.")
return

# Let the exception error the unit
unit = self._check_if_primary_already_selected()
if unit:
event.fail(f"Cannot promote - {unit.name} is already primary: demote it first")
return

# If this is a standby-leader, then execute switchover logic
# TODO

# Now, publish that this unit is the leader
if not self.endpoint:
event.fail("No relation found.")
return
primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION)
if not primary_relation:
event.fail("No primary relation")
return

primary_relation.data[self.charm.unit]["elected"] = json.dumps(
{
"endpoint": self.endpoint,
"replication-password": self.charm._patroni._replication_password,
"superuser-password": self.charm._patroni._superuser_password,
}
)
# event.set_result()

def _on_demote_primary_cluster(self, event: ActionEvent) -> None:
"""Moves a primary cluster to standby."""
if (
"cluster_initialised" not in self.charm._peers.data[self.charm.app]
or not self.charm._patroni.member_started
):
event.fail("Cluster not initialized yet.")
return

if not self.charm.unit.is_leader():
event.fail("Not the charm leader unit.")
return

# Let the exception error the unit
unit = self._check_if_primary_already_selected()
if not unit or unit.name != self.charm.unit.name:
event.fail(f"Cannot promote - {unit.name} is primary")
return

# If this is a standby-leader, then execute switchover logic
# TODO

# Now, publish that this unit is the leader
del self._get_primary_candidates()[self.charm.unit].data["elected"]
# event.set_result()
11 changes: 11 additions & 0 deletions templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ bootstrap:
command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore
no_params: True
keep_existing_recovery_conf: True
{% elif standby_cluster_endpoint %}
standby_cluster:
host: {{ standby_cluster_endpoint }}
port: 5432
create_replica_methods: ["backup_restore", "basebackup"]
{% else %}
initdb:
- auth-host: md5
Expand All @@ -58,6 +63,9 @@ bootstrap:
pg_hba:
- {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5
- {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5
{%- for endpoint in extra_replication_endpoints %}
- {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }} md5
{%- endfor %}
bypass_api_service: true
log:
dir: /var/log/postgresql
Expand Down Expand Up @@ -113,6 +121,9 @@ postgresql:
- {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5
{%- endif %}
- {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5
{%- for e in extra_replication_endpoints %}
- {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5
{%- endfor -%}
{%- for endpoint in endpoints %}
- {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5
{%- endfor %}
Expand Down

0 comments on commit b6251b4

Please sign in to comment.