Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(esClientDrain): enhance Drain ES Client function #168

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,28 @@ const (
defaultMetricsAddress = ":7979"
defaultClientGoTimeout = 30 * time.Second
defaultClusterDNSZone = "cluster.local."
defaultRetryCount = "999"
defaultRetryWaitTime = "10s"
defaultRetryMaxWaitTime = "30s"
)

var (
config struct {
Interval time.Duration
AutoscalerInterval time.Duration
APIServer *url.URL
PodSelectors Labels
PriorityNodeSelectors Labels
MetricsAddress string
ClientGoTimeout time.Duration
Debug bool
OperatorID string
Namespace string
ClusterDNSZone string
ElasticsearchEndpoint *url.URL
Interval time.Duration
AutoscalerInterval time.Duration
APIServer *url.URL
PodSelectors Labels
PriorityNodeSelectors Labels
MetricsAddress string
ClientGoTimeout time.Duration
Debug bool
OperatorID string
Namespace string
ClusterDNSZone string
ElasticsearchEndpoint *url.URL
EsClientRetryCount int
EsClientRetryWaitTime time.Duration
EsClientRetryMaxWaitTime time.Duration
}
)

Expand Down Expand Up @@ -71,6 +77,9 @@ func main() {
URLVar(&config.ElasticsearchEndpoint)
kingpin.Flag("namespace", "Limit operator to a certain namespace").
Default(v1.NamespaceAll).StringVar(&config.Namespace)
kingpin.Flag("esclient-retry-count", "Count of retry operations conducted by EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryCount).IntVar(&config.EsClientRetryCount)
kingpin.Flag("esclient-retry-waittime", "Wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryWaitTime).DurationVar(&config.EsClientRetryWaitTime)
kingpin.Flag("esclient-retry-max-waittime", "Max wait time between two operations of EsClient. Used when the operator is waiting for an ES cluster condition, e.g. shards relocation.").Default(defaultRetryMaxWaitTime).DurationVar(&config.EsClientRetryMaxWaitTime)

kingpin.Parse()

Expand Down Expand Up @@ -98,6 +107,9 @@ func main() {
config.Namespace,
config.ClusterDNSZone,
config.ElasticsearchEndpoint,
config.EsClientRetryCount,
config.EsClientRetryWaitTime,
config.EsClientRetryMaxWaitTime,
)

go handleSigterm(cancel)
Expand Down
16 changes: 13 additions & 3 deletions operator/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type ElasticsearchOperator struct {
elasticsearchEndpoint *url.URL
operating map[types.UID]operatingEntry
sync.Mutex
recorder kube_record.EventRecorder
recorder kube_record.EventRecorder
esClientRetryConfig *RetryConfig
}

type operatingEntry struct {
Expand All @@ -66,6 +67,9 @@ func NewElasticsearchOperator(
namespace,
clusterDNSZone string,
elasticsearchEndpoint *url.URL,
esClientRetryCount int,
esClientRetryWaitTime,
esClientRetryMaxWaitTime time.Duration,
) *ElasticsearchOperator {
return &ElasticsearchOperator{
logger: log.WithFields(
Expand All @@ -84,6 +88,11 @@ func NewElasticsearchOperator(
elasticsearchEndpoint: elasticsearchEndpoint,
operating: make(map[types.UID]operatingEntry),
recorder: createEventRecorder(client),
esClientRetryConfig: &RetryConfig{
ClientRetryCount: esClientRetryCount,
ClientRetryWaitTime: esClientRetryWaitTime,
ClientRetryMaxWaitTime: esClientRetryMaxWaitTime,
},
}
}

Expand Down Expand Up @@ -497,8 +506,8 @@ func (r *EDSResource) ensureService(ctx context.Context) error {
}

// Drain drains a pod for Elasticsearch data.
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod) error {
return r.esClient.Drain(ctx, pod)
func (r *EDSResource) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error {
return r.esClient.Drain(ctx, pod, config)
}

// PreScaleDownHook ensures that the IndexReplicas is set as defined in the EDS
Expand Down Expand Up @@ -641,6 +650,7 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete
interval: o.interval,
logger: logger,
recorder: o.recorder,
esClientRetryConfig: o.esClientRetryConfig,
}

rs := &EDSResource{
Expand Down
12 changes: 10 additions & 2 deletions operator/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"k8s.io/apimachinery/pkg/types"
)

const (
defaultRetryCount = 999
defaultRetryWaitTime = 10 * time.Second
defaultRetryMaxWaitTime = 30 * time.Second
)

func TestHasOwnership(t *testing.T) {
eds := &zv1.ElasticsearchDataSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -43,7 +49,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
faker := &clientset.Clientset{
Interface: fake.NewSimpleClientset(),
}
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil)
esOperator := NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", "cluster.local.", nil,
defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)

eds := &zv1.ElasticsearchDataSet{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -59,7 +66,8 @@ func TestGetElasticsearchEndpoint(t *testing.T) {
customEndpoint, err := url.Parse(customURL)
assert.NoError(t, err)

esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint)
esOperator = NewElasticsearchOperator(faker, nil, 1*time.Second, 1*time.Second, "", "", ".cluster.local.", customEndpoint,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's pass the RetryConfig from here instead of individual arguments.

defaultRetryCount, defaultRetryWaitTime, defaultRetryMaxWaitTime)
url = esOperator.getElasticsearchEndpoint(eds)
assert.Equal(t, customURL, url.String())
}
Expand Down
104 changes: 87 additions & 17 deletions operator/es_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
v1 "k8s.io/api/core/v1"
)

// TODO make configurable as flags.
var (
defaultRetryCount = 999
defaultRetryWaitTime = 10 * time.Second
defaultRetryMaxWaitTime = 30 * time.Second
)
// Restry Configuration
type RetryConfig struct {
ClientRetryCount int
ClientRetryWaitTime time.Duration
ClientRetryMaxWaitTime time.Duration
}

// ESClient is a pod drainer which can drain data from Elasticsearch pods.
type ESClient struct {
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *ESClient) logger() *log.Entry {
}

// Drain drains data from an Elasticsearch pod.
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod, config *RetryConfig) error {

c.logger().Info("Ensuring cluster is in green state")

Expand All @@ -111,11 +111,15 @@ func (c *ESClient) Drain(ctx context.Context, pod *v1.Pod) error {
}

c.logger().Info("Waiting for draining to finish")
return c.waitForEmptyEsNode(ctx, pod)
return c.waitForEmptyEsNode(ctx, pod, config)
}

func (c *ESClient) Cleanup(ctx context.Context) error {

// prevent ESClient from execute another operations on excludeIPList in ES
c.mux.Lock()
defer c.mux.Unlock()

// 1. fetch IPs from _cat/nodes
nodes, err := c.GetNodes()
if err != nil {
Expand Down Expand Up @@ -203,13 +207,14 @@ func (c *ESClient) getClusterSettings() (*ESSettings, error) {
// adds the podIP to Elasticsearch exclude._ip list
func (c *ESClient) excludePodIP(pod *v1.Pod) error {

// prevent ESClient from execute another operations on excludeIPList in ES
c.mux.Lock()
defer c.mux.Unlock()

podIP := pod.Status.PodIP

esSettings, err := c.getClusterSettings()
if err != nil {
c.mux.Unlock()
return err
}

Expand All @@ -220,6 +225,7 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
if excludeString != "" {
ips = strings.Split(excludeString, ",")
}

var foundPodIP bool
for _, ip := range ips {
if ip == podIP {
Expand All @@ -233,7 +239,6 @@ func (c *ESClient) excludePodIP(pod *v1.Pod) error {
err = c.setExcludeIPs(strings.Join(ips, ","))
}

c.mux.Unlock()
return err
}

Expand All @@ -256,6 +261,45 @@ func (c *ESClient) setExcludeIPs(ips string) error {
return nil
}

// remove the podIP from Elasticsearch exclude._ip list
func (c *ESClient) undoExcludePodIP(pod *v1.Pod) error {

// prevent ESClient from execute another operations on excludeIPList in ES
c.mux.Lock()
defer c.mux.Unlock()
s-vkropotko marked this conversation as resolved.
Show resolved Hide resolved

podIP := pod.Status.PodIP

esSettings, err := c.getClusterSettings()
if err != nil {
return err
}

excludedIPsString := esSettings.Transient.Cluster.Routing.Allocation.Exclude.IP
excludedIPs := strings.Split(excludedIPsString, ",")

// create a new array with excludedIP without provided Pod IP address
var newExcludedIPs []string
for _, excludeIP := range excludedIPs {
if excludeIP != podIP {
newExcludedIPs = append(newExcludedIPs, excludeIP)
sort.Strings(newExcludedIPs)
}
}

newExcludedIPsString := strings.Join(newExcludedIPs, ",")
if newExcludedIPsString != excludedIPsString {
c.logger().Infof("Setting exclude list to '%s'", newExcludedIPsString)

err = c.setExcludeIPs(newExcludedIPsString)
if err != nil {
return err
}
}

return nil
}

func (c *ESClient) updateAutoRebalance(value string) error {
resp, err := resty.New().R().
SetHeader("Content-Type", "application/json").
Expand All @@ -276,23 +320,26 @@ func (c *ESClient) updateAutoRebalance(value string) error {
}

// repeatedly query shard allocations to ensure success of drain operation.
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod, config *RetryConfig) error {
// TODO: implement context handling
podIP := pod.Status.PodIP
_, err := resty.New().
SetRetryCount(defaultRetryCount).
SetRetryWaitTime(defaultRetryWaitTime).
SetRetryMaxWaitTime(defaultRetryMaxWaitTime).
resp, err := resty.New().
SetRetryCount(config.ClientRetryCount).
SetRetryWaitTime(config.ClientRetryWaitTime).
SetRetryMaxWaitTime(config.ClientRetryMaxWaitTime).
AddRetryCondition(
// It is expected to return (bool, error) pair. Resty will retry
// in case condition returns true or non nil error.
func(r *resty.Response) (bool, error) {
if !r.IsSuccess() {
return true, nil
}

var shards []ESShard
err := json.Unmarshal(r.Body(), &shards)
if err != nil {
return true, err
}
// shardIP := make(map[string]bool)
remainingShards := 0
for _, shard := range shards {
if shard.IP == podIP {
Expand All @@ -312,9 +359,32 @@ func (c *ESClient) waitForEmptyEsNode(ctx context.Context, pod *v1.Pod) error {
},
).R().
Get(c.Endpoint.String() + "/_cat/shards?h=index,ip&format=json")

if err != nil {
return err
}

if !resp.IsSuccess() {
return fmt.Errorf("HTTP endpoint responded with not expected status code %d", resp.StatusCode())
}

var shards []ESShard
err = json.Unmarshal(resp.Body(), &shards)
if err != nil {
return err
}

for _, shard := range shards {
if shard.IP == podIP {
err = fmt.Errorf("Cannot migrate shards from pod '%s' with IP '%s' within provided intervals", pod.ObjectMeta.Name, pod.Status.PodIP)
// if we cannot remove node than return it back active nodes pool
if errExclude := c.undoExcludePodIP(pod); errExclude != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand the motivation for this, I'm not sure this is the best place to handle the issue you describe.

  • If we hit the drain timeout the right thing would be to retry draining which is handled elsewhere in operator.go. Otherwise we can end up draining a bit, re-enabling the pod so it gets shards back, and then attempt to drain it once more.
  • If we hit the drain timeout and the operator decides it's better to scale out and not drain, as I understand the issue described that this aims to solve, then I think it would be better to undo the draining in the operator.go code and not have it as a side effect of the es client drain.

Copy link
Author

@s-vkropotko s-vkropotko Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand what do you suggest.
The current implementation looks like this:
separate autoscaler -> getScalingDirection -> updateEDS(parameters)
ES cluster -> run operator -> run operator.operate() in some interval -> ... -> operatePods can run Drain func directly or from rescaleStatefulSet func -> control is transferred elaticserach.go -> Drain -> waiting for draining with some retry settings

The idea of the current code to provide users the ability to stop infinity waiting for draining (this can happen mostly because of configuration error between EDS and ES indices or some problems with pods/cluster).
My assumption was next: if users specify wrong settings and Drain stopped work then users can check logs and fix settings for their case.
ES-Op guarantees only that Drain will have a rollback in case of failure.

If we hit the drain timeout the right thing would be to retry 
draining which is handled elsewhere in operator.go.  
Otherwise, we can end up draining a bit, re-enabling the pod so it gets shards back, 
and then attempt to drain it once more.

Am I right that you suggest adding an additional setting for this retry? or you suggest to rid of the retry with Resty lib?
An additional setting on the operator level will produce kinda identical behavior as a specified already existed Resty settings besides checking cluster status. But this also introduces complexity to calculate the final max waiting interval.

If we hit the drain timeout and the operator decides it's better to scale out and not drain, as I understand the issue described that this aims to solve, then I think it would be better to undo the draining in the operator.go code and not have it as a side effect of the es client drain.

So do you suggest just wrap undoExcludePodIP func into new func, .e.g. undoDrain and run it with bound to Drain in operator.go?
Is there a case when Es-Op received an error from Drain func and doesn't need to run undoDrain beside the case when an additional retry option for Drain existed?
Also, I can try to implement the next functionality:

  • if Drain is unsuccessful -> leave the pod in the excluded list and set annotation/status on EDS
  • During the next operator run check that the previous status exists and if ES-Op doesn't decide to try to Drain it again -> run undoDrain for it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikkeloscar hey, could you please check my comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikkeloscar I would be good with this behaviour because it unblocks other cluster operations, but the users should have alerting on "Cannot migrate shards from pod" in the logs, because apparently there's some wrong configuration in the cluster to be fixed manually.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I'm sorry for not following up here earlier.

My main point was that the code added here in my mind is operator code and should belong in the logic of operator.go and not in the client for calling ES.

Right now, what would happen on a drain timeout is that the drain would be undone, by the new code added here, and the next time the operator loop runs, it would select the exact same pod again, because it has the drain annotation, and it would again start draining, so you basically don't win anything by having it here is my point.

One clear problem that I see with the current behavior (before this PR) is that on scale-out we don't stop the draining of the pod and then undo the drain, which would make sense if we were only draining the pod for scaledown previously. However, if the pod is being drained because of a node being shut down, then draining is exactly what we want, and it IMO shouldn't be stopped.

Apart from the scale-out scenario, I don't completely follow the idea of stopping the "infinite" drain in case the cluster is misconfigured. What would be the alternative? To stop the draining and then do what? I agree with @otrosien that the operators/owners of the EDS should have some sort of alert on these kind of misconfigurations, because it's not something the es-operator is able to handle by itself, so from my perspective it's best it just tries to drain and then a human can get alerted if this drain takes longer that expected (maybe we can create better logs or other metrics, to make it easier to create such alerts?).
@otrosien you mentioned that it "unblocks other cluster operations". Are there other operations than what I mentioned which are blocked during the drain? Maybe I'm missing some knowledge about this and are judging the thing wrong?

Copy link
Author

@s-vkropotko s-vkropotko Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, sorry for the delayed response.
Let's clarify several things:

  1. I think I was no so clear with describing current behavior, it's not an infinity cycle to wait for the Drain condition.
    Current behavior works next way:
    function waitForEmptyEsNode conducts up to 999 retry operations with increasing interval (from SetRetryWaitTime to SetRetryMaxWaitTime values, or 10 -> 30 seconds in our case).
    So this requires hours till it will be finished.
    Then in waitForEmptyEsNode we ignore response
    https://github.com/zalando-incubator/es-operator/pull/168/files#diff-67b73dbcc9b2b625a0749cfc62f02ad480ccd9d40abd4664e1455f01ff446965L282
    And check only err which will be nil because unsuccessful retries are not an error condition.
    (instead of that we need to check another parameter - https://github.com/zalando-incubator/es-operator/pull/168/files#diff-67b73dbcc9b2b625a0749cfc62f02ad480ccd9d40abd4664e1455f01ff446965R367)
    After this we remove a target node(pod) from the K8S cluster despite this node was not correctly removed from ES cluster.
  2. about unblocking other cluster operations, for example:
    We have a cluster that is stuck in scale-down operation, e.g. cannot migrate shards to other data nodes.
    If the user updated an EDS(increased minReplicas) and the current scaling direction would be UP then in the current implementation (before this PR) ES-Op will continue doing retries for scaling down.
    If a user(or some system conditions) will restart ES-Op in this case then ES-Op will start to do the expected scaling up the operation. But here we have another issue - the IP address of the node that ES-Op tried to remove will be presented in exclude_list.

In the PR implementation, it will work as before (I didn't change existed Retry configuration) but we will have a graceful exit(ref 1) from failed Drain. Also, users will have the ability to configure the custom timeout for Drain which is useful for cases with regular scale in/down operations(ref 2), like scaling EDS by Cronjobs.

So according to your first comment, I think I can leave a fix for 1) and for 2) - add function undoDrain which contains undoExclude to operator.go

Copy link
Author

@s-vkropotko s-vkropotko Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we can add an additional parameter to control behavior in the case of undoDrain
@mikkeloscar

return fmt.Errorf("during handling request error: '%v' another error has been raised '%v'", err, errExclude)
}
return err
}
}

return nil
}

Expand Down Expand Up @@ -452,7 +522,7 @@ func (c *ESClient) CreateIndex(indexName, groupName string, shards, replicas int
SetHeader("Content-Type", "application/json").
SetBody([]byte(
fmt.Sprintf(
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
`{"settings": {"index" : {"number_of_replicas" : "%d", "number_of_shards": "%d",
"routing.allocation.include.group": "%s"}}}`,
replicas,
shards,
Expand Down
Loading