Skip to content

Commit

Permalink
Handle etcd status condition when cluster reset and disable etcd
Browse files Browse the repository at this point in the history
Signed-off-by: Vitor Savian <[email protected]>

Set condition if node is unhealthy

Signed-off-by: Vitor Savian <[email protected]>
  • Loading branch information
vitorsavian committed Jan 3, 2024
1 parent 231cb6e commit f1d33ce
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 33 deletions.
119 changes: 86 additions & 33 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes"
nodeHelper "k8s.io/component-helpers/node/util"
nodeUtil "k8s.io/kubernetes/pkg/controller/util/node"
)
Expand All @@ -70,6 +70,13 @@ const (
defaultKeepAliveTimeout = 10 * time.Second

maxBackupRetention = 5

etcdStatusType = v1.NodeConditionType("EtcdIsVoter")

StatusUnjoined MemberStatus = "unjoined"
StatusUnhealthy MemberStatus = "unhealthy"
StatusLearner MemberStatus = "learner"
StatusVoter MemberStatus = "voter"
)

var (
Expand All @@ -91,6 +98,8 @@ type NodeControllerGetter func() controllerv1.NodeController
// explicit interface check
var _ managed.Driver = &ETCD{}

type MemberStatus string

type ETCD struct {
client *clientv3.Client
config *config.Control
Expand Down Expand Up @@ -1036,6 +1045,7 @@ func (e *ETCD) manageLearners(ctx context.Context) {
logrus.Debug("Etcd client was nil")
continue
}

endpoints := getEndpoints(e.config)
if status, err := e.client.Status(ctx, endpoints[0]); err != nil {
logrus.Errorf("Failed to check local etcd status for learner management: %v", err)
Expand Down Expand Up @@ -1067,28 +1077,52 @@ func (e *ETCD) manageLearners(ctx context.Context) {
logrus.Warnf("Failed to list nodes with etcd role: %v", err)
}

// a map to track if a node is a member of the etcd cluster or not
nodeIsMember := make(map[string]bool)
nodesMap := make(map[string]*v1.Node)
for _, node := range nodes {
nodeIsMember[node.Name] = false
nodesMap[node.Name] = node
}

for _, member := range members.Members {
var node *v1.Node
for _, node = range nodes {
if strings.HasPrefix(member.Name, node.Name+"-") {
nodeIsMember[node.Name] = true
}
}

if member.IsLearner {
if err := e.trackLearnerProgress(ctx, progress, member); err != nil {
logrus.Errorf("Failed to track learner progress towards promotion: %v", err)
}
for _, node := range nodes {
if strings.HasPrefix(member.Name, node.Name+"-") {
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, false)
if err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
}

if err := e.setEtcdStatusCondition(node, client, member.Name, StatusLearner); err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
break
}

for _, node := range nodes {
if strings.HasPrefix(member.Name, node.Name+"-") {
_, _, err := e.setEtcdStatusCondition(node, client.CoreV1(), member.Name, true)
if err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
// verify if the member is healthy and set the etcd status condition
if _, err := e.getETCDStatus(ctx, member.ClientURLs[0]); err != nil {
logrus.Errorf("Could not get status for member %s: %v", member.Name, err)
if err := e.setEtcdStatusCondition(node, client, member.Name, StatusUnhealthy); err != nil {
logrus.Errorf("Unable to set etcd status condition for unhealthy node %s: %v", member.Name, err)
}
continue
}

if err := e.setEtcdStatusCondition(node, client, member.Name, StatusVoter); err != nil {
logrus.Errorf("Unable to set etcd status condition %s: %v", member.Name, err)
}
}

for nodeName, isMember := range nodeIsMember {
if !isMember {
node := nodesMap[nodeName]
if err := e.setEtcdStatusCondition(node, client, nodeName, StatusUnjoined); err != nil {
logrus.Errorf("Unable to set etcd status condition for a node that is not a cluster member %s: %v", nodeName, err)
}
}
}
Expand Down Expand Up @@ -1129,9 +1163,7 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre

// Update progress by retrieving status from the member's first reachable client URL
for _, ep := range member.ClientURLs {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()
status, err := e.client.Status(ctx, ep)
status, err := e.getETCDStatus(ctx, ep)
if err != nil {
logrus.Debugf("Failed to get etcd status from learner %s at %s: %v", member.Name, ep, err)
continue
Expand Down Expand Up @@ -1162,46 +1194,67 @@ func (e *ETCD) trackLearnerProgress(ctx context.Context, progress *learnerProgre
return e.setLearnerProgress(ctx, progress)
}

func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client corev1.CoreV1Interface, memberName string, promoted bool) (*v1.Node, []byte, error) {
etcdStatusType := v1.NodeConditionType("EtcdIsVoter")
func (e *ETCD) getETCDStatus(ctx context.Context, url string) (*clientv3.StatusResponse, error) {
ctx, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()

return e.client.Status(ctx, url)
}

func (e *ETCD) setEtcdStatusCondition(node *v1.Node, client kubernetes.Interface, memberName string, memberStatus MemberStatus) error {
var newCondition v1.NodeCondition
if promoted {
switch memberStatus {
case StatusLearner:
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "False",
Reason: "MemberIsLearner",
Message: "Node has not been promoted to voting member of the etcd cluster",
}
case StatusVoter:
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "True",
Reason: "MemberNotLearner",
Message: "Node is a voting member of the etcd cluster",
}
} else {
case StatusUnhealthy:
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "False",
Reason: "MemberIsLearner",
Message: "Node has not been promoted to voting member of the etcd cluster",
Reason: "Unhealthy",
Message: "Node is unhealthy",
}
case StatusUnjoined:
newCondition = v1.NodeCondition{
Type: etcdStatusType,
Status: "False",
Reason: "NotAMember",
Message: "Node is not a member of the etcd cluster",
}
default:
logrus.Warnf("Unknown etcd member status %s", memberStatus)
return nil
}

updatedNode := *node
if find, condition := nodeUtil.GetNodeCondition(&updatedNode.Status, etcdStatusType); find >= 0 {
if condition.Status == newCondition.Status {
logrus.Debugf("Member %s is not changing etcd status condition", memberName)
if find, condition := nodeUtil.GetNodeCondition(&node.Status, etcdStatusType); find >= 0 {
if condition.Status == newCondition.Status && memberStatus != StatusUnjoined {
logrus.Debugf("Node %s is not changing etcd status condition", memberName)
condition.LastHeartbeatTime = metav1.Now()
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition)
}

logrus.Debugf("Member %s is changing etcd condition", memberName)
logrus.Debugf("Node %s is changing etcd status condition", memberName)
condition = &newCondition
condition.LastHeartbeatTime = metav1.Now()
condition.LastTransitionTime = metav1.Now()
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), *condition)
}

logrus.Infof("Adding etcd member %s status condition", memberName)
logrus.Infof("Adding node %s etcd status condition", memberName)
newCondition.LastHeartbeatTime = metav1.Now()
newCondition.LastTransitionTime = metav1.Now()
updatedNode.Status.Conditions = append(updatedNode.Status.Conditions, newCondition)
return nodeHelper.PatchNodeStatus(client, types.NodeName(node.Name), node, &updatedNode)
return nodeHelper.SetNodeCondition(client, types.NodeName(node.Name), newCondition)
}

// getLearnerProgress returns the stored learnerProgress struct as retrieved from etcd
Expand Down
5 changes: 5 additions & 0 deletions pkg/etcd/metadata_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/util/retry"
nodeUtil "k8s.io/kubernetes/pkg/controller/util/node"
)

func registerMetadataHandlers(ctx context.Context, etcd *ETCD) {
Expand Down Expand Up @@ -108,6 +109,10 @@ func (m *metadataHandler) handleSelf(node *v1.Node) (*v1.Node, error) {
node.Labels = map[string]string{}
}

if find, _ := nodeUtil.GetNodeCondition(&node.Status, etcdStatusType); find >= 0 {
node.Status.Conditions = append(node.Status.Conditions[:find], node.Status.Conditions[find+1:]...)
}

delete(node.Annotations, NodeNameAnnotation)
delete(node.Annotations, NodeAddressAnnotation)
delete(node.Labels, util.ETCDRoleLabelKey)
Expand Down

0 comments on commit f1d33ce

Please sign in to comment.