Skip to content

Commit

Permalink
Fix #19 - work with autoscaler
Browse files Browse the repository at this point in the history
  • Loading branch information
eterna2 committed Oct 16, 2019
1 parent a81cd39 commit eccc757
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 6 deletions.
2 changes: 2 additions & 0 deletions hooks/build
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
docker build --build-arg OS=linux --build-arg ARCH=amd64 -f $DOCKERFILE_PATH -t $IMAGE_NAME .
74 changes: 71 additions & 3 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

drain "github.com/openshift/kubernetes-drain"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -17,6 +17,7 @@ import (
type kubernetesReadiness struct {
clientset *kubernetes.Clientset
ignoreDaemonSets bool
deleteLocalData bool
}

func (k *kubernetesReadiness) getUnreadyCount(hostnames []string, ids []string) (int, error) {
Expand Down Expand Up @@ -71,6 +72,7 @@ func (k *kubernetesReadiness) prepareTermination(hostnames []string, ids []strin
IgnoreDaemonsets: k.ignoreDaemonSets,
GracePeriodSeconds: -1,
Force: true,
DeleteLocalData: k.deleteLocalData,
})
if err != nil {
return fmt.Errorf("Unexpected error draining kubernetes node %s: %v", hostname, err)
Expand Down Expand Up @@ -128,13 +130,79 @@ func homeDir() string {
return os.Getenv("USERPROFILE") // windows
}

func kubeGetReadinessHandler(ignoreDaemonSets bool) (readiness, error) {
func kubeGetReadinessHandler(ignoreDaemonSets bool, deleteLocalData bool) (readiness, error) {
clientset, err := kubeGetClientset()
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return nil, nil
}
return &kubernetesReadiness{clientset: clientset, ignoreDaemonSets: ignoreDaemonSets}, nil
return &kubernetesReadiness{clientset: clientset, ignoreDaemonSets: ignoreDaemonSets, deleteLocalData: deleteLocalData}, nil
}

// setScaleDownDisabledAnnotation set the "cluster-autoscaler.kubernetes.io/scale-down-disabled" annotation
// on the list of nodes if required. Returns a list of hostname where the annotation
// is applied.
func setScaleDownDisabledAnnotation(hostnames []string) ([]string, error) {
// get the node reference - first need the hostname
var (
node *corev1.Node
hostname string
err error
key = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
annotated = []string{}
)
clientset, err := kubeGetClientset()
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return annotated, nil
}
nodes := clientset.CoreV1().Nodes()
for _, h := range hostnames {
node, err = nodes.Get(h, v1.GetOptions{})
if err != nil {
return annotated, fmt.Errorf("Unexpected error getting kubernetes node %s: %v", hostname, err)
}
annotations := node.GetAnnotations()
if value := annotations[key]; value != "true" {
annotations[key] = "true"
node.SetAnnotations(annotations)
nodes.Update(node)
annotated = append(annotated, h)
}
}
return annotated, nil
}
func removeScaleDownDisabledAnnotation(hostnames []string) error {
// get the node reference - first need the hostname
var (
node *corev1.Node
hostname string
err error
key = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
)
clientset, err := kubeGetClientset()
if err != nil {
log.Fatalf("Error getting kubernetes connection: %v", err)
}
if clientset == nil {
return nil
}
nodes := clientset.CoreV1().Nodes()
for _, h := range hostnames {
node, err = nodes.Get(h, v1.GetOptions{})
if err != nil {
return fmt.Errorf("Unexpected error getting kubernetes node %s: %v", hostname, err)
}
annotations := node.GetAnnotations()
if _, ok := annotations[key]; ok {
delete(annotations, key)
node.SetAnnotations(annotations)
nodes.Update(node)
}
}
return nil
}
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ func main() {

// get config env
ignoreDaemonSets := os.Getenv("ROLLER_IGNORE_DAEMONSETS") != "false"
deleteLocalData := os.Getenv("ROLLER_DELETE_LOCAL_DATA") != "false"
// get a kube connection
readinessHandler, err := kubeGetReadinessHandler(ignoreDaemonSets)
readinessHandler, err := kubeGetReadinessHandler(ignoreDaemonSets, deleteLocalData)
if err != nil {
log.Fatalf("Error getting kubernetes readiness handler when required: %v", err)
}
Expand All @@ -43,7 +44,7 @@ func main() {

// infinite loop
for {
err = adjust(asgList, ec2Svc, asgSvc, readinessHandler, originalDesired)
err := adjust(asgList, ec2Svc, asgSvc, readinessHandler, originalDesired)
if err != nil {
log.Printf("Error adjusting AutoScaling Groups: %v", err)
}
Expand Down
33 changes: 32 additions & 1 deletion roller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
Expand Down Expand Up @@ -31,12 +32,20 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
}
// if there are no outdated instances skip updating
if len(oldI) == 0 {
log.Printf("[%s] ok\n", *asg.AutoScalingGroupName)
err := ensureNoScaleDownDisabledAnnotation(ec2Svc, mapInstancesIds(asg.Instances))
if err != nil {
log.Printf("[%s] Unable to update node annotations: %v\n", *asg.AutoScalingGroupName, err)
}
continue
}

log.Printf("[%s] need updates: %d\n", *asg.AutoScalingGroupName, len(oldI))

asgMap[*asg.AutoScalingGroupName] = asg
instances = append(instances, oldI...)
instances = append(instances, newI...)

}
// no instances no work needed
if len(instances) == 0 {
Expand All @@ -59,9 +68,14 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
// keep keyed references to the ASGs
for _, asg := range asgMap {
newDesiredA, newOriginalA, terminateID, err := calculateAdjustment(asg, ec2Svc, hostnameMap, readinessHandler, originalDesired[*asg.AutoScalingGroupName])
log.Printf("[%s] desired: %d original: %d", *asg.AutoScalingGroupName, newDesiredA, newOriginalA)
if err != nil {
log.Printf("[%s] error: %v\n", *asg.AutoScalingGroupName, err)
}
newDesired[*asg.AutoScalingGroupName] = newDesiredA
newOriginalDesired[*asg.AutoScalingGroupName] = newOriginalA
if terminateID != "" {
log.Printf("[%s] Scheduled termination: %s", *asg.AutoScalingGroupName, terminateID)
newTerminate[*asg.AutoScalingGroupName] = terminateID
}
errors[asg.AutoScalingGroupName] = err
Expand All @@ -72,13 +86,15 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
}
// adjust current desired
for asg, desired := range newDesired {
log.Printf("[%s] set desired instances: %d\n", asg, desired)
err = setAsgDesired(asgSvc, asgMap[asg], desired)
if err != nil {
return fmt.Errorf("Error setting desired to %d for ASG %s: %v", desired, asg, err)
}
}
// terminate nodes
for asg, id := range newTerminate {
log.Printf("[%s] terminating node: %s\n", asg, id)
// all new config instances are ready, terminate an old one
err = awsTerminateNode(asgSvc, id)
if err != nil {
Expand All @@ -88,6 +104,16 @@ func adjust(asgList []string, ec2Svc ec2iface.EC2API, asgSvc autoscalingiface.Au
return nil
}

// ensureNoScaleDownDisabledAnnotation remove any "cluster-autoscaler.kubernetes.io/scale-down-disabled"
// annotations in the nodes as no update is required anymore.
func ensureNoScaleDownDisabledAnnotation(ec2Svc ec2iface.EC2API, ids []string) error {
hostnames, err := awsGetHostnames(ec2Svc, ids)
if err != nil {
return fmt.Errorf("Unable to get aws hostnames for ids %v: %v", ids, err)
}
return removeScaleDownDisabledAnnotation(hostnames)
}

// calculateAdjustment calculates the new settings for the desired number, and which node (if any) to terminate
// this makes no actual adjustment, only calculates what new settings should be
// returns:
Expand Down Expand Up @@ -131,6 +157,7 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
if *i.HealthStatus == healthy {
readyCount++
}

}
if int64(readyCount) < originalDesired+1 {
return desired, originalDesired, "", nil
Expand Down Expand Up @@ -158,12 +185,16 @@ func calculateAdjustment(asg *autoscaling.Group, ec2Svc ec2iface.EC2API, hostnam
for _, i := range ids {
hostnames = append(hostnames, hostnameMap[i])
}
_, err = setScaleDownDisabledAnnotation(hostnames)
if err != nil {
log.Printf("Unable to set disabled scale down annotations: %v", err)
}
unReadyCount, err = readinessHandler.getUnreadyCount(hostnames, ids)
if err != nil {
return desired, originalDesired, "", fmt.Errorf("Error getting readiness new node status: %v", err)
}
if unReadyCount > 0 {
return desired, originalDesired, "", nil
return desired, originalDesired, "", fmt.Errorf("[%s] Nodes not ready: %d", *asg.AutoScalingGroupName, unReadyCount)
}
}
candidate := *oldInstances[0].InstanceId
Expand Down

0 comments on commit eccc757

Please sign in to comment.