Skip to content

Commit

Permalink
First support finished:
Browse files Browse the repository at this point in the history
Deploy two models, each with 1x postgresql
Then, configure async replication as follows:
  $ juju switch psql-1
  $ juju offer postgresql-k8s:async-primary async-primary  # async-primary is the relation provided by the leader
  $ juju switch psql-2
  $ juju consume admin/psql-1.async-primary  # consume the primary relation
  $ juju relate postgresql-k8s:async-replica async-primary  # Both units are now related, where postgresql-k8s in model psql-2 is the standby-leader

Now, run the action:
  $ juju run -m psql-1 postgresql-k8s/0 promote-standby-cluster  # move postgresql-k8s in model psql-1 to be the leader cluster

Run the following command to check status:
  $ PATRONI_KUBERNETES_LABELS='{application: patroni, cluster-name: patroni-postgresql-k8s}' \
    PATRONI_KUBERNETES_NAMESPACE=psql-2 \  # update to model number
    PATRONI_KUBERNETES_USE_ENDPOINTS=true \
    PATRONI_NAME=postgresql-k8s-0 \
    PATRONI_REPLICATION_USERNAME=replication \
    PATRONI_SCOPE=patroni-postgresql-k8s \
    PATRONI_SUPERUSER_USERNAME=operator \
      patronictl -c /var/lib/postgresql/data/patroni.yml list

Role should be "Standby leader" and State should be "Running".
  • Loading branch information
phvalguima committed Oct 24, 2023
1 parent b6251b4 commit c4c0adb
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 14 deletions.
98 changes: 85 additions & 13 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ class MoreThanOnePrimarySelectedError(Exception):
"""Represents more than one primary has been selected."""


def _get_pod_ip():
"""Reads some files to quickly figure out its own pod IP.
It should work for any Ubuntu-based image
"""
with open("/etc/hosts") as f:
hosts = f.read()
with open("/etc/hostname") as f:
hostname = f.read().replace("\n", "")
line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0]
return line.split("\t")[0]


class PostgreSQLAsyncReplication(Object):
"""Defines the async-replication management logic."""

Expand All @@ -46,7 +59,7 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION
self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed
)
self.framework.observe(
self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_primary_changed
self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed
)
self.framework.observe(
self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster
Expand Down Expand Up @@ -74,16 +87,18 @@ def standby_endpoints(self) -> Set[str]:
"""Returns the set of IPs used by each standby unit with a /32 mask."""
standby_endpoints = set()
for rel in self.relation_set:
for unit in rel.units:
for unit in self._all_units(rel):
if not rel.data[unit].get("elected", None):
standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"])))
if "pod-address" in rel.data[unit]:
standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"])))
return standby_endpoints

def get_primary_data(self) -> Dict[str, str]:
"""Returns the primary info, if available."""
for rel in self.relation_set:
for unit in rel.units:
if unit.name == self.charm.unit.name:
for unit in self._all_units(rel):
if "elected" in rel.data[unit] and unit.name == self.charm.unit.name:
# If this unit is the leader, then return None
return None
if rel.data[unit].get("elected", None):
Expand All @@ -95,21 +110,65 @@ def get_primary_data(self) -> Dict[str, str]:
}
return None

def _all_units(self, relation):
return {*relation.units, self.charm.unit}

def _all_replica_published_pod_ips(self) -> bool:
for rel in self.relation_set:
for unit in self._all_units(rel):
if "elected" in rel.data[unit]:
# This is the leader unit, it will not publish its own pod address
continue
if "pod-address" not in rel.data[unit]:
return False
return True

def _on_primary_changed(self, _):
"""Triggers a configuration change."""
"""Triggers a configuration change in the primary units."""
primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION)
if not primary_relation:
return

primary = self._check_if_primary_already_selected()
if not primary:
return

if primary.name == self.charm.unit.name:
# This unit is the leader, generate a new configuration and leave.
# There is nothing to do for the leader.
self.charm.update_config()
self.container.start(self.charm._postgresql_service)
if primary.name != self.charm.unit.name:
# no primary available, once it has been configured, it will trigger
# a new event changed
return

if not self._all_replica_published_pod_ips():
# We will have more events happening, no need for retrigger
return

# This unit is the leader, generate a new configuration and leave.
# There is nothing to do for the leader.
self.container.stop(self.charm._postgresql_service)
self.charm.update_config()
self.container.start(self.charm._postgresql_service)

# Retrigger the other units' async-replica-changed
primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "true"

def _on_standby_changed(self, _):
"""Triggers a configuration change."""
primary = self._check_if_primary_already_selected()
if not primary:
return

replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION)
if not replica_relation:
return

# Check if we have already published pod-address. If not, then we are waiting
# for the leader to catch all the pod ips and restart itself
if "pod-address" not in replica_relation.data[self.charm.unit]:
replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip()
# Finish here and wait for the retrigger from the primary cluster
return

self.container.stop(self.charm._postgresql_service)
# Standby units must delete their data folder
# Delete the K8S endpoints that tracks the cluster information, including its id.
# This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't
Expand Down Expand Up @@ -145,10 +204,10 @@ def _check_if_primary_already_selected(self) -> Unit:
if not self.relation_set:
return None
for rel in self.relation_set:
for unit in rel.units:
for unit in self._all_units(rel):
if "elected" in rel.data[unit] and not result:
result = unit
elif result:
elif "elected" in rel.data[unit] and result:
raise MoreThanOnePrimarySelectedError
return result

Expand Down Expand Up @@ -190,6 +249,13 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None:
"superuser-password": self.charm._patroni._superuser_password,
}
)

# Now, check if postgresql it had originally published its pod IP in the
# replica relation databag. Delete it, if yes.
replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION)
if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]:
return
del replica_relation.data[self.charm.unit]["pod-address"]
# event.set_result()

def _on_demote_primary_cluster(self, event: ActionEvent) -> None:
Expand All @@ -213,7 +279,13 @@ def _on_demote_primary_cluster(self, event: ActionEvent) -> None:

# If this is a standby-leader, then execute switchover logic
# TODO
primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION)
if not primary_relation or "elected" not in primary_relation.data[self.charm.unit]:
event.fail("No primary relation")
return

# Now, publish that this unit is the leader
del self._get_primary_candidates()[self.charm.unit].data["elected"]
del primary_relation.data[self.charm.unit].data["elected"]
if "primary-cluster-ready" in primary_relation.data[self.charm.unit]:
del primary_relation.data[self.charm.unit]["primary-cluster-ready"]
# event.set_result()
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ bootstrap:
standby_cluster:
host: {{ standby_cluster_endpoint }}
port: 5432
create_replica_methods: ["backup_restore", "basebackup"]
create_replica_methods: ["basebackup"]
{% else %}
initdb:
- auth-host: md5
Expand Down

0 comments on commit c4c0adb

Please sign in to comment.