Skip to content

Commit

Permalink
Merge pull request #1959 from percona/K8SPXC-1546-cache
Browse files Browse the repository at this point in the history
K8SPXC-1546: Cache binlog->gtid set pairs
  • Loading branch information
hors authored Feb 4, 2025
2 parents c0ddc81 + 2ba4b02 commit 0807c64
Show file tree
Hide file tree
Showing 19 changed files with 545 additions and 82 deletions.
1 change: 1 addition & 0 deletions build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ COPY build/liveness-check.sh /liveness-check.sh
COPY build/readiness-check.sh /readiness-check.sh
COPY build/pmm-prerun.sh /pmm-prerun.sh
COPY build/get-pxc-state /get-pxc-state
COPY build/wsrep_cmd_notify_handler.sh /wsrep_cmd_notify_handler.sh

COPY build/haproxy-entrypoint.sh /haproxy-entrypoint.sh
COPY build/haproxy-init-entrypoint.sh /haproxy-init-entrypoint.sh
Expand Down
4 changes: 3 additions & 1 deletion build/pxc-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@ if [[ $MYSQL_VERSION =~ ^(8\.0|8\.4)$ ]]; then
sed -i "/\[mysqld\]/a enforce-gtid-consistency" $CFG
fi

sed -i "/\[mysqld\]/a wsrep_notify_cmd=/var/lib/mysql/wsrep_cmd_notify_handler.sh" $CFG

# add sst.cpat to exclude pxc-entrypoint, unsafe-bootstrap, pxc-configure-pxc from SST cleanup
grep -q "^progress=" $CFG && sed -i "s|^progress=.*|progress=1|" $CFG
grep -q "^\[sst\]" "$CFG" || printf '[sst]\n' >>"$CFG"
grep -q "^cpat=" "$CFG" || sed '/^\[sst\]/a cpat=.*\\.pem$\\|.*init\\.ok$\\|.*galera\\.cache$\\|.*wsrep_recovery_verbose\\.log$\\|.*readiness-check\\.sh$\\|.*liveness-check\\.sh$\\|.*get-pxc-state$\\|.*sst_in_progress$\\|.*sleep-forever$\\|.*pmm-prerun\\.sh$\\|.*sst-xb-tmpdir$\\|.*\\.sst$\\|.*gvwstate\\.dat$\\|.*grastate\\.dat$\\|.*\\.err$\\|.*\\.log$\\|.*RPM_UPGRADE_MARKER$\\|.*RPM_UPGRADE_HISTORY$\\|.*pxc-entrypoint\\.sh$\\|.*unsafe-bootstrap\\.sh$\\|.*pxc-configure-pxc\\.sh\\|.*peer-list$\\|.*auth_plugin$\\|.*version_info$\\|.*mysql-state-monitor$\\|.*mysql-state-monitor\\.log$\\|.*notify\\.sock$\\|.*mysql\\.state$' "$CFG" 1<>"$CFG"
grep -q "^cpat=" "$CFG" || sed '/^\[sst\]/a cpat=.*\\.pem$\\|.*init\\.ok$\\|.*galera\\.cache$\\|.*wsrep_recovery_verbose\\.log$\\|.*readiness-check\\.sh$\\|.*liveness-check\\.sh$\\|.*get-pxc-state$\\|.*sst_in_progress$\\|.*sleep-forever$\\|.*pmm-prerun\\.sh$\\|.*sst-xb-tmpdir$\\|.*\\.sst$\\|.*gvwstate\\.dat$\\|.*grastate\\.dat$\\|.*\\.err$\\|.*\\.log$\\|.*RPM_UPGRADE_MARKER$\\|.*RPM_UPGRADE_HISTORY$\\|.*pxc-entrypoint\\.sh$\\|.*unsafe-bootstrap\\.sh$\\|.*pxc-configure-pxc\\.sh\\|.*peer-list$\\|.*auth_plugin$\\|.*version_info$\\|.*mysql-state-monitor$\\|.*mysql-state-monitor\\.log$\\|.*notify\\.sock$\\|.*mysql\\.state$\\|.*wsrep_cmd_notify_handler\\.sh$' "$CFG" 1<>"$CFG"

if [[ $MYSQL_VERSION == '8.0' && $MYSQL_PATCH_VERSION -ge 26 ]] || [[ $MYSQL_VERSION == '8.4' ]]; then
grep -q "^skip_replica_start=ON" "$CFG" || sed -i "/\[mysqld\]/a skip_replica_start=ON" $CFG
Expand Down
1 change: 1 addition & 0 deletions build/pxc-init-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ install -o "$(id -u)" -g "$(id -g)" -m 0755 -D /peer-list /var/lib/mysql/peer-li
install -o "$(id -u)" -g "$(id -g)" -m 0755 -D /get-pxc-state /var/lib/mysql/get-pxc-state
install -o "$(id -u)" -g "$(id -g)" -m 0755 -D /pmm-prerun.sh /var/lib/mysql/pmm-prerun.sh
install -o "$(id -u)" -g "$(id -g)" -m 0755 -D /mysql-state-monitor /var/lib/mysql/mysql-state-monitor
install -o "$(id -u)" -g "$(id -g)" -m 0755 -D /wsrep_cmd_notify_handler.sh /var/lib/mysql/wsrep_cmd_notify_handler.sh
42 changes: 42 additions & 0 deletions build/wsrep_cmd_notify_handler.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash

set -o errexit

while [ $# -gt 0 ]; do
case $1 in
--status)
STATUS=$2
shift
;;
--uuid)
CLUSTER_UUID=$2
shift
;;
--primary)
[ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0"
shift
;;
--index)
INDEX=$2
shift
;;
--members)
MEMBERS=$2
shift
;;
esac

shift
done

CLUSTER_NAME=$(hostname -f | cut -d'-' -f1)
CLUSTER_FQDN=$(hostname -f | cut -d'.' -f3-)

if [[ "$STATUS" == "joiner" ]]; then
PITR_HOST="${CLUSTER_NAME}-pitr.${CLUSTER_FQDN}"
if getent hosts "${PITR_HOST}" >/dev/null 2>&1; then
curl -d "hostname=$(hostname -f)" "http://${PITR_HOST}:8080/invalidate-cache/"
fi
fi

exit 0
91 changes: 91 additions & 0 deletions cmd/pitr/collector/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package collector

import (
"bytes"
"context"
"encoding/json"
"log"
"time"

"github.com/percona/percona-xtradb-cluster-operator/pkg/pxc/backup/storage"
"github.com/pkg/errors"
)

type BinlogCacheEntry struct {
Binlogs map[string]string `json:"binlogs"` // binlog name -> gtid set
}

type HostBinlogCache struct {
// host -> binlogs. we use pointer here to not copy BinlogCacheEntry
// in case if it grows big and it'll grow big eventually.
Entries map[string]*BinlogCacheEntry `json:"entries"`
Version int `json:"version"`
LastUpdatedAt time.Time `json:"last_updated_at"`
}

func (e *BinlogCacheEntry) Get(key string) (string, bool) {
value, ok := e.Binlogs[key]
return value, ok
}

func (e *BinlogCacheEntry) Set(key, value string) {
e.Binlogs[key] = value
}

func loadCache(ctx context.Context, s storage.Storage, key string) (*HostBinlogCache, error) {
cache := &HostBinlogCache{
Entries: make(map[string]*BinlogCacheEntry),
Version: 1,
}

objReader, err := s.GetObject(ctx, key)
if err != nil {
if errors.Is(err, storage.ErrObjectNotFound) {
log.Printf("WARNING: cache file %s not found", key)
return cache, nil
}
return nil, errors.Wrap(err, "get cache from storage")
}
defer objReader.Close()

if err := json.NewDecoder(objReader).Decode(cache); err != nil {
return nil, errors.Wrap(err, "decode cache")
}

return cache, nil
}

func saveCache(ctx context.Context, s storage.Storage, cache *HostBinlogCache, key string) error {
log.Printf("updating binlog cache")
cache.LastUpdatedAt = time.Now()

data, err := json.Marshal(cache)
if err != nil {
return errors.Wrap(err, "marshal cache")
}

err = s.PutObject(ctx, key, bytes.NewReader(data), int64(len(data)))
return errors.Wrap(err, "put cache to s3")
}

func InvalidateCache(ctx context.Context, c *Collector, hostname string) error {
cache, err := loadCache(ctx, c.GetStorage(), c.GetGTIDCacheKey())
if err != nil {
return errors.Wrap(err, "load cache")
}

_, ok := cache.Entries[hostname]
if !ok {
return errors.Errorf("failed to find cache for %s", hostname)
}

log.Printf("invalidating cache for %s", hostname)
delete(cache.Entries, hostname)

err = saveCache(ctx, c.GetStorage(), cache, c.GetGTIDCacheKey())
if err != nil {
return errors.Wrap(err, "save cache")
}

return nil
}
Loading

0 comments on commit 0807c64

Please sign in to comment.