Skip to content

Commit

Permalink
Change the namespacing model for Aerospike
Browse files Browse the repository at this point in the history
Previously we were starting one probe worker for each cluster. With one driver per worker.

Now we spawn one worker per namespace per cluster.
The main benefit is that namespace are handled
independantly by the probe, it won't get stuck if
one namespace is failing to start.
  • Loading branch information
geobeau committed Jan 6, 2023
1 parent c62e04f commit bfddaff
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 197 deletions.
232 changes: 113 additions & 119 deletions pkg/aerospike/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,78 +94,76 @@ func LatencyCheck(p topology.ProbeableEndpoint) error {
policy.MaxRetries = 0 // Ensure we never retry
policy.ReplicaPolicy = as.MASTER // Read are always done on master

for namespace := range e.Namespaces {
for range e.Client.Cluster().GetNodes() { // scale the number of latency checks to the number of nodes
e.Client.WarmUp(e.Config.genericConfig.ConnectionQueueSize)
// TODO configurable set
key, as_err := as.NewKey(namespace, e.Config.genericConfig.MonitoringSet, fmt.Sprintf("%s%s", keyPrefix, utils.RandomHex(20)))
if as_err != nil {
return as_err
}
val := as.BinMap{
"val": utils.RandomHex(1024),
}
for range e.Client.Cluster().GetNodes() { // scale the number of latency checks to the number of nodes
e.Client.WarmUp(e.Config.genericConfig.ConnectionQueueSize)
// TODO configurable set
key, as_err := as.NewKey(e.Namespace, e.Config.genericConfig.MonitoringSet, fmt.Sprintf("%s%s", keyPrefix, utils.RandomHex(20)))
if as_err != nil {
return as_err
}
val := as.BinMap{
"val": utils.RandomHex(1024),
}

node, err := getWriteNode(e.Client, policy, key)
if err != nil {
return errors.Wrapf(err, "error when trying to find node for: %s", keyAsStr(key))
}
node, err := getWriteNode(e.Client, policy, key)
if err != nil {
return errors.Wrapf(err, "error when trying to find node for: %s", keyAsStr(key))
}

// PUT OPERATION
labels := []string{"put", node.GetHost().Name, namespace, e.ClusterName, node.GetName()}
// PUT OPERATION
labels := []string{"put", node.GetHost().Name, e.Namespace, e.ClusterName, node.GetName()}

opPut := func() error {
return e.Client.Put(policy, key, val)
}
opPut := func() error {
return e.Client.Put(policy, key, val)
}

err = ObserveOpLatency(opPut, labels)
if err != nil {
return errors.Wrapf(err, "record put failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record put: %s", keyAsStr(key)))

err = ObserveOpLatency(opPut, labels)
// GET OPERATION
labels[0] = "get"
opGet := func() error {
recVal, err := e.Client.Get(&policy.BasePolicy, key)
if err != nil {
return errors.Wrapf(err, "record put failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record put: %s", keyAsStr(key)))

// GET OPERATION
labels[0] = "get"
opGet := func() error {
recVal, err := e.Client.Get(&policy.BasePolicy, key)
if err != nil {
return err
}
if recVal == nil {
return errors.Errorf("Record not found after being put")
}
if recVal.Bins["val"] != val["val"] {
return errors.Errorf("Get succeeded but there is a missmatch between server value {%s} and pushed value", recVal.Bins["val"])
}
return err
}

err = ObserveOpLatency(opGet, labels)
if err != nil {
return errors.Wrapf(err, "record get failed for: %s", keyAsStr(key))
if recVal == nil {
return errors.Errorf("Record not found after being put")
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record get: %s", keyAsStr(key)))

// DELETE OPERATION
labels[0] = "delete"
opDelete := func() error {
existed, as_err := e.Client.Delete(policy, key)
if !existed {
return errors.Errorf("Delete succeeded but there was no data to delete")
}
if as_err != nil {
return as_err
} else {
return nil
}
if recVal.Bins["val"] != val["val"] {
return errors.Errorf("Get succeeded but there is a missmatch between server value {%s} and pushed value", recVal.Bins["val"])
}
return err
}

err = ObserveOpLatency(opDelete, labels)
if err != nil {
return errors.Wrapf(err, "record delete failed for: %s", keyAsStr(key))
err = ObserveOpLatency(opGet, labels)
if err != nil {
return errors.Wrapf(err, "record get failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record get: %s", keyAsStr(key)))

// DELETE OPERATION
labels[0] = "delete"
opDelete := func() error {
existed, as_err := e.Client.Delete(policy, key)
if !existed {
return errors.Errorf("Delete succeeded but there was no data to delete")
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record delete: %s", keyAsStr(key)))
if as_err != nil {
return as_err
} else {
return nil
}
}

err = ObserveOpLatency(opDelete, labels)
if err != nil {
return errors.Wrapf(err, "record delete failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record delete: %s", keyAsStr(key)))
}
return nil
}
Expand All @@ -186,50 +184,48 @@ func DurabilityPrepare(p topology.ProbeableEndpoint) error {
// If the probe find a missmatch it will repush the keys
expectedAllPushedFlagVal := fmt.Sprintf("%s:%d", "v1", keyRange) // v1 represents the format of the data stored.

for namespace := range e.Namespaces {
// allPushedFlag indicate if a probe have pushed all data once
allPushedFlag, err := as.NewKey(namespace, e.Config.genericConfig.MonitoringSet, fmt.Sprintf("%s%s", keyPrefix, "all_pushed_flag"))
if err != nil {
return err
}
// allPushedFlag indicate if a probe have pushed all data once
allPushedFlag, err := as.NewKey(e.Namespace, e.Config.genericConfig.MonitoringSet, fmt.Sprintf("%s%s", keyPrefix, "all_pushed_flag"))
if err != nil {
return err
}

recVal, err := e.Client.Get(&policy.BasePolicy, allPushedFlag)

recVal, err := e.Client.Get(&policy.BasePolicy, allPushedFlag)
if err != nil && !err.Matches(as.ErrKeyNotFound.ResultCode) {
return err
}
// If the flag was found we skip the init as it has already been done
if recVal != nil && recVal.Bins["val"] == expectedAllPushedFlagVal {
return nil
}

if err != nil && !err.Matches(as.ErrKeyNotFound.ResultCode) {
for i := 0; i < keyRange; i++ {
keyName := fmt.Sprintf("%s%d", keyPrefix, i)
key, err := as.NewKey(e.Namespace, e.Config.genericConfig.MonitoringSet, keyName)
if err != nil {
return err
}
// If the flag was found we skip the init as it has already been done
if recVal != nil && recVal.Bins["val"] == expectedAllPushedFlagVal {
continue
}

for i := 0; i < keyRange; i++ {
keyName := fmt.Sprintf("%s%d", keyPrefix, i)
key, err := as.NewKey(namespace, e.Config.genericConfig.MonitoringSet, keyName)
if err != nil {
return err
}

val := as.BinMap{
"val": hash(keyName),
}

err = e.Client.Put(policy, key, val)
if err != nil {
return errors.Wrapf(err, "record put failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record durability put: %s (%s)", keyAsStr(key), val["val"]))
val := as.BinMap{
"val": hash(keyName),
}

allPushedFlagVal := as.BinMap{
"val": expectedAllPushedFlagVal,
}
err = e.Client.Put(policy, allPushedFlag, allPushedFlagVal)
err = e.Client.Put(policy, key, val)
if err != nil {
return errors.Wrapf(err, "Push flag put failed for: %s", keyAsStr(allPushedFlag))
return errors.Wrapf(err, "record put failed for: %s", keyAsStr(key))
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("record durability put: %s (%s)", keyAsStr(key), val["val"]))
}

allPushedFlagVal := as.BinMap{
"val": expectedAllPushedFlagVal,
}
err = e.Client.Put(policy, allPushedFlag, allPushedFlagVal)
if err != nil {
return errors.Wrapf(err, "Push flag put failed for: %s", keyAsStr(allPushedFlag))
}

return nil
}

Expand All @@ -243,34 +239,32 @@ func DurabilityCheck(p topology.ProbeableEndpoint) error {
keyRange := e.Config.genericConfig.DurabilityKeyTotal
keyPrefix := e.Config.genericConfig.DurabilityKeyPrefix

for namespace := range e.Namespaces {
total_found_items := 0.0
total_corrupted_items := 0.0
for i := 0; i < keyRange; i++ {
keyName := fmt.Sprintf("%s%d", keyPrefix, i)
key, err := as.NewKey(namespace, e.Config.genericConfig.MonitoringSet, keyName)
if err != nil {
return err
}
total_found_items := 0.0
total_corrupted_items := 0.0
for i := 0; i < keyRange; i++ {
keyName := fmt.Sprintf("%s%d", keyPrefix, i)
key, err := as.NewKey(e.Namespace, e.Config.genericConfig.MonitoringSet, keyName)
if err != nil {
return err
}

recVal, err := e.Client.Get(policy, key)
if err != nil {
level.Error(e.Logger).Log("msg", fmt.Sprintf("Error while fetching record: %s", keyAsStr(key)), "err", err)
continue
}
if recVal.Bins["val"] != hash(keyName) {
level.Warn(e.Logger).Log("msg",
fmt.Sprintf("Get successful but the data didn't match what was expected got: '%s', expected: '%s' (for %s)",
recVal.Bins["val"], hash(keyName), keyAsStr(key)))
total_corrupted_items += 1
} else {
total_found_items += 1
}
level.Debug(e.Logger).Log("msg", fmt.Sprintf("durability record validated: %s (%s)", keyAsStr(key), recVal.Bins["val"]))
recVal, err := e.Client.Get(policy, key)
if err != nil {
level.Error(e.Logger).Log("msg", fmt.Sprintf("Error while fetching record: %s", keyAsStr(key)), "err", err)
continue
}
if recVal.Bins["val"] != hash(keyName) {
level.Warn(e.Logger).Log("msg",
fmt.Sprintf("Get successful but the data didn't match what was expected got: '%s', expected: '%s' (for %s)",
recVal.Bins["val"], hash(keyName), keyAsStr(key)))
total_corrupted_items += 1
} else {
total_found_items += 1
}
durabilityExpectedItems.WithLabelValues(namespace, e.ClusterName, e.GetName()).Set(float64(keyRange))
durabilityFoundItems.WithLabelValues(namespace, e.ClusterName, e.GetName()).Set(total_found_items)
durabilityCorruptedItems.WithLabelValues(namespace, e.ClusterName, e.GetName()).Set(total_corrupted_items)
level.Debug(e.Logger).Log("msg", fmt.Sprintf("durability record validated: %s (%s)", keyAsStr(key), recVal.Bins["val"]))
}
durabilityExpectedItems.WithLabelValues(e.Namespace, e.ClusterName, e.GetName()).Set(float64(keyRange))
durabilityFoundItems.WithLabelValues(e.Namespace, e.ClusterName, e.GetName()).Set(total_found_items)
durabilityCorruptedItems.WithLabelValues(e.Namespace, e.ClusterName, e.GetName()).Set(total_corrupted_items)
return nil
}
91 changes: 50 additions & 41 deletions pkg/aerospike/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/go-kit/log/level"
)

func (conf *AerospikeProbeConfig) generateAerospikeEndpointFromEntry(logger log.Logger, entry discovery.ServiceEntry) (*AerospikeEndpoint, error) {
func (conf *AerospikeProbeConfig) generateNamespacedEndpointsFromEntry(logger log.Logger, entry discovery.ServiceEntry) ([]*AerospikeEndpoint, error) {
authEnabled := conf.AerospikeEndpointConfig.AuthEnabled
var (
username string
Expand Down Expand Up @@ -47,53 +47,62 @@ func (conf *AerospikeProbeConfig) generateAerospikeEndpointFromEntry(logger log.
clusterName = entry.Address
}

namespaces := make(map[string]struct{})
namespaces := conf.getNamespacesFromEntry(entry)

if conf.AerospikeEndpointConfig.NamespaceMetaKey != "" {
nsString, ok := entry.Meta[conf.AerospikeEndpointConfig.NamespaceMetaKey]
if ok {
nsFromDiscovery := strings.Split(nsString, ";")
for _, ns := range nsFromDiscovery {
namespaces[ns] = struct{}{}
}
var endpoints []*AerospikeEndpoint
for namespace := range namespaces {
e := &AerospikeEndpoint{Name: clusterName,
ClusterName: clusterName,
Namespace: namespace,
ClusterLevel: true,
Config: AerospikeClientConfig{
// auth
authEnabled: authEnabled,
username: username,
password: password,
// tls
tlsEnabled: tlsEnabled,
tlsHostname: tlsHostname,
// conf
genericConfig: &conf.AerospikeEndpointConfig,
// Contact point
host: as.Host{Name: entry.Address, TLSName: tlsHostname, Port: entry.Port}},
Logger: log.With(logger, "endpoint_name", entry.Address),
}
endpoints = append(endpoints, e)
}

return &AerospikeEndpoint{Name: entry.Address,
ClusterName: clusterName,
Namespaces: namespaces,
Config: AerospikeClientConfig{
// auth
authEnabled: authEnabled,
username: username,
password: password,
// tls
tlsEnabled: tlsEnabled,
tlsHostname: tlsHostname,
// conf
genericConfig: &conf.AerospikeEndpointConfig,
// Contact point
host: as.Host{Name: entry.Address, TLSName: tlsHostname, Port: entry.Port}},
Logger: log.With(logger, "endpoint_name", entry.Address),
}, nil
return endpoints, nil
}

func (conf AerospikeProbeConfig) generateNodeFromEntry(logger log.Logger, entry discovery.ServiceEntry) (topology.ProbeableEndpoint, error) {
return conf.generateAerospikeEndpointFromEntry(logger, entry)
}

func (conf AerospikeProbeConfig) generateClusterFromEntries(logger log.Logger, entries []discovery.ServiceEntry) (topology.ProbeableEndpoint, error) {
endpoint, err := conf.generateAerospikeEndpointFromEntry(logger, entries[0])
if err != nil {
return endpoint, err
func (conf AerospikeProbeConfig) getNamespacesFromEntry(entry discovery.ServiceEntry) map[string]struct{} {
namespaces := make(map[string]struct{})
nsString, ok := entry.Meta[conf.AerospikeEndpointConfig.NamespaceMetaKey]
if ok {
nsFromDiscovery := strings.Split(nsString, ";")
for _, ns := range nsFromDiscovery {
namespaces[ns] = struct{}{}
}
}
endpoint.Name = entries[0].Meta[conf.DiscoveryConfig.MetaClusterKey]
endpoint.ClusterName = entries[0].Meta[conf.DiscoveryConfig.MetaClusterKey]
endpoint.Logger = log.With(logger, "endpoint_name", endpoint.Name)
endpoint.ClusterLevel = true
return endpoint, nil
return namespaces
}

func (conf AerospikeProbeConfig) GenerateTopologyBuilder() func(log.Logger, []discovery.ServiceEntry) (topology.ClusterMap, error) {
return conf.DiscoveryConfig.GetGenericTopologyBuilder(conf.generateClusterFromEntries, conf.generateNodeFromEntry)
func (conf AerospikeProbeConfig) NamespacedTopologyBuilder() func(log.Logger, []discovery.ServiceEntry) (topology.ClusterMap, error) {
return func(logger log.Logger, entries []discovery.ServiceEntry) (topology.ClusterMap, error) {
clusterMap := topology.NewClusterMap()
clusterEntries := conf.DiscoveryConfig.GroupNodesByCluster(logger, entries)
for _, entries := range clusterEntries {
endpoints, err := conf.generateNamespacedEndpointsFromEntry(logger, entries[0])
if err != nil {
return clusterMap, err
}

for _, endpoint := range endpoints {
cluster := topology.NewCluster(endpoint)
clusterMap.AppendCluster(cluster)
}

}
return clusterMap, nil
}
}
Loading

0 comments on commit bfddaff

Please sign in to comment.