-
Notifications
You must be signed in to change notification settings - Fork 193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
K8SPXC-1546: Cache binlog->gtid set pairs #1959
Conversation
cmd/pitr/collector/cache.go
Outdated
) | ||
|
||
const ( | ||
CacheKey = "gtid-binlog-cache.json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this constant is used only at package level, we can make it unexported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i moved it to binlogcollector package. operator adds this to collector deployment as env variable now, because we need to use it in operator code too
} | ||
|
||
func saveCache(ctx context.Context, storage storage.Storage, cache *HostBinlogCache) error { | ||
log.Printf("updating binlog cache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to keep all these logging or do we have them for now only for debugging purposes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i intend to keep these logs, yes
cmd/pitr/collector/cache.go
Outdated
|
||
objReader, err := storage.GetObject(ctx, CacheKey) | ||
if err != nil { | ||
if strings.Contains(err.Error(), "object not found") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can use: errors.Is(err, storage.ErrObjectNotFound)
since we already import thestorage
package. Keep in mind that we have to rename the input storage variable to something else, I propose s
.
cmd/pitr/collector/cache.go
Outdated
} | ||
|
||
type HostBinlogCache struct { | ||
Entries map[string]*BinlogCacheEntry `json:"entries"` // host -> binlogs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it is worth adding a small comment for noting that the pointer usage here is mainly because BinlogCacheEntry
can grow big - AFAIU that's the reason.
cmd/pitr/collector/collector.go
Outdated
|
||
log.Printf("%s: %s", binlog.Name, set) | ||
gtidSet = set | ||
cache.Entries[c.db.GetHost()].Binlogs[binlog.Name] = set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since here we are essentially referring to the same hostCache as L.370, we can introduce something like this in cache.go
func (e *BinlogCacheEntry) Set(key, value string) {
e.Binlogs[key] = value
}
and instead of writing this this long assignment, we can simplify it by doing this: hostCache.Set(binlog.Name, gtidSet)
cmd/pitr/collector/collector.go
Outdated
log.Printf("%s: %s", binlog.Name, set) | ||
|
||
binlogs[i].GTIDSet = pxc.NewGTIDSet(set) | ||
cache.Entries[c.db.GetHost()].Binlogs[binlog.Name] = binlogs[i].GTIDSet.Raw() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hostCache.Set(binlog.Name, binlogs[i].GTIDSet.Raw())
can be used here as well.
Name: "GTID_CACHE_KEY", | ||
Value: GTIDCacheKey, | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
73551ef
to
ac12e6b
Compare
--status) | ||
STATUS=$2 | ||
shift | ||
;; | ||
--uuid) | ||
CLUSTER_UUID=$2 | ||
shift | ||
;; | ||
--primary) | ||
[ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0" | ||
shift | ||
;; | ||
--index) | ||
INDEX=$2 | ||
shift | ||
;; | ||
--members) | ||
MEMBERS=$2 | ||
shift | ||
;; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[shfmt] reported by reviewdog 🐶
--status) | |
STATUS=$2 | |
shift | |
;; | |
--uuid) | |
CLUSTER_UUID=$2 | |
shift | |
;; | |
--primary) | |
[ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0" | |
shift | |
;; | |
--index) | |
INDEX=$2 | |
shift | |
;; | |
--members) | |
MEMBERS=$2 | |
shift | |
;; | |
--status) | |
STATUS=$2 | |
shift | |
;; | |
--uuid) | |
CLUSTER_UUID=$2 | |
shift | |
;; | |
--primary) | |
[ "$2" = "yes" ] && PRIMARY="1" || PRIMARY="0" | |
shift | |
;; | |
--index) | |
INDEX=$2 | |
shift | |
;; | |
--members) | |
MEMBERS=$2 | |
shift | |
;; |
CLUSTER_NAME=$(hostname -f | cut -d'-' -f1) | ||
CLUSTER_FQDN=$(hostname -f | cut -d'.' -f3-) | ||
|
||
if [[ "$STATUS" == "joiner" ]]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[shfmt] reported by reviewdog 🐶
if [[ "$STATUS" == "joiner" ]]; then | |
if [[ $STATUS == "joiner" ]]; then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some small additional comments, overall seems 💪🏽 to me
cmd/pitr/main.go
Outdated
return | ||
} | ||
|
||
ctx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is preferable for us to use the r.Context()
since it can also be used for cancellation/timeouts. It also defaults to the background context if nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't know about r.Context()
, makes complete sense
cmd/pitr/main.go
Outdated
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
log.Println("ERROR: get collector config:", err) | ||
return | ||
} | ||
|
||
c, err := collector.New(ctx, config) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
log.Println("ERROR: get new collector:", err) | ||
return | ||
} | ||
|
||
cache, err := collector.LoadCache(ctx, c.GetStorage(), c.GetGTIDCacheKey()) | ||
if err != nil { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
log.Println("ERROR: failed to load cache:", err) | ||
return | ||
} | ||
|
||
_, ok := cache.Entries[hostname] | ||
if !ok { | ||
w.WriteHeader(http.StatusBadRequest) | ||
if _, err := w.Write([]byte("hostname couldn't find in cache")); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The collector package could also expose a collector.InvalidateCache
function that we could use directly without having to compose it like we do here, which would be also reusable. It fits a lot to the existing API which already has the following functionality:
- loads cache
- saves cache
If we don't have time for this now, we can definitely improve it in the future. It is an easy fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do that, we can also turn the load and save cache functions to unexported, since they are going to be used only within the same package i.e. of the collector
.
Currently, the binlog collector flushes binary logs every X seconds. X is configurable by users through cr.yaml. Usually users want to run collector every 60 seconds to collect any event happened in last minute and meet their objectives for recovery. Running the collector every 60 seconds has the nasty side effect of creating a lot of binlogs. For example, after running the collector for a month creates `30 * 24 * 60 = 43200` binlogs. Collector connects to the PXC host that has the oldest binlog. That means the host it collects binlogs change after binlogs are purged and/or expired. For this reason, the collector mainly cares about the GTID sets in each binlog file. In every run, collector first runs `SHOW BINARY LOGS` to get the binlog list and runs `SELECT get_gtid_set_by_binlog(?)` for each binlog in the list to assign a GTID set to each binlog. After running the collector for a month (and having 43200+ binlogs as a result), collector lose the ability to recover from crash. Because it's impossible to go through 43k+ binlogs in a 60 second time window. For this reason, we decided to cache binlog->gtid set pairs. With these changes, the collector will maintain a cache in json in the same storage that binlogs are uploaded. When collector starts, it'll first check the cache file and if it can't find the cache it'll ignore all timeout until the cache is populated. Cache contains binlog names and GTID sets for each PXC host. There are some events that requires cache to be invalidated because they affect the binlogs on hosts: * After a restore, cache needs to be invalidated for all hosts. This is performed by the operator after restore succeeds. Operator simply deletes the cache file in binlog storage to make collector re-populate the cache. * After a SST, cache needs to be invalidated for Joiner host. This is performed by the Joiner host itself. To achieve this, we use `wsrep_notify_cmd` to trigger our script whenever there's a status change. Triggered script sends a HTTP request to collector pod (using the Service we create for collector) with its hostname, and collector deletes cache entries for that host and update the cache file in storage.
ac12e6b
to
1794cd4
Compare
commit: 2ba4b02 |
CHANGE DESCRIPTION
CHECKLIST
Jira
Needs Doc
) and QA (Needs QA
)?Tests
compare/*-oc.yml
)?Config/Logging/Testability