Skip to content

Commit

Permalink
automatically reconciles nodes allocation settings once on node start…
Browse files Browse the repository at this point in the history
…up, removes omitempty flag on necessary field, aligns indexing fo rreplicas vs pod names
  • Loading branch information
owen-d committed Feb 28, 2019
1 parent b6feca1 commit daf084f
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 43 deletions.
33 changes: 12 additions & 21 deletions cmd/handler/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,6 @@ import (
"strings"
)

/*
example:
{
"transient": {
"cluster": {
"routing": {
"allocation": {
"exclude": {
"_name": "mycluster-quorum-master-1"
}
}
}
}
}
}
*/

// TODO: executing these on all nodes definitely exposes us to race conditions where changes
// can be swallowed.
type ElasticAllocationSettings struct {
Expand All @@ -37,12 +20,12 @@ type ElasticAllocationSettings struct {
Routing struct {
Allocation struct {
Exclude struct {
Name string `json:"_name,omitempty"`
Name string `json:"_name"`
} `json:"exclude,omitempty"`
} `json:"allocation,omitempty"`
} `json:"routing,omitempty"`
} `json:"cluster,omitempty"`
} `json:"persistent,omitempty"`
} `json:"transient,omitempty"`
}

func ExistsIn(xs []string, val string) bool {
Expand All @@ -52,7 +35,15 @@ func ExistsIn(xs []string, val string) bool {
}
}
return false
}

func NonEmpty(xs []string) (res []string) {
for _, x := range xs {
if x != "" {
res = append(res, x)
}
}
return res
}

func (e *ElasticAllocationSettings) Excluded(name string) bool {
Expand All @@ -63,7 +54,7 @@ func (e *ElasticAllocationSettings) Exclude(name string) {
if e.Excluded(name) {
return
}
exclusions := strings.Split(e.Transient.Cluster.Routing.Allocation.Exclude.Name, ",")
exclusions := NonEmpty(strings.Split(e.Transient.Cluster.Routing.Allocation.Exclude.Name, ","))
exclusions = append(exclusions, name)
e.Transient.Cluster.Routing.Allocation.Exclude.Name = strings.Join(exclusions, ",")
}
Expand All @@ -73,7 +64,7 @@ func (e *ElasticAllocationSettings) Include(name string) {
return
}

exclusions := strings.Split(e.Transient.Cluster.Routing.Allocation.Exclude.Name, ",")
exclusions := NonEmpty(strings.Split(e.Transient.Cluster.Routing.Allocation.Exclude.Name, ","))
newExclusions := []string{}

for _, exclusion := range exclusions {
Expand Down
44 changes: 32 additions & 12 deletions cmd/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func Schedulable(conf util.SchedulableConfig, nodeName string) bool {
// Handler applies shard schedulability based on a config file.
// It also maintains a ticker which exits the current process if the current node is not schedulable and has no shards
func main() {
setup()
files, errs, configDone, err := watcher.ConfigMapChanges(opts.ConfigFile)
if err != nil {
bail(err)
Expand All @@ -42,17 +43,38 @@ func main() {
done := make(chan struct{})

go func() {
// create a ticker to ensure an initial routing update is performed.
// This is helpful when scaling back up after a node has been marked unschedulable.
// It allows the node to remark itself as schedulable in accordance with it's configmap on an initial mount.

ticker := time.NewTicker(time.Second * 10)

log("awaiting events")

for {

select {
case <-configDone:
log("watcher exhausted")
done <- struct{}{}
case err = <-errs:
log(err)
case <-ticker.C:
log(fmt.Sprintf("ensuring initial routing for node [%s]", opts.NodeName))
if err = UpdateRouting(); err != nil {
log(err)
} else {
// once the ticker-spawned update completes once, close it.
// Successive updates only need be performed in response
// to configmap changes
ticker.Stop()
}
case changed := <-files:
log("file changed:\n", string(changed))
if err = UpdateRouting(changed); err != nil {
if err = yaml.Unmarshal(changed, &conf); err != nil {
log(err)
}
if err = UpdateRouting(); err != nil {
log(err)
}
}
Expand Down Expand Up @@ -84,36 +106,34 @@ func main() {
<-done
}

func UpdateRouting(fileBytes []byte) (err error) {
if err = yaml.Unmarshal(fileBytes, &conf); err != nil {
return err
}

func UpdateRouting() (err error) {
settings, err := GetAllocationSettings(client)
if err != nil {
return err
}

schedulable := Schedulable(conf, opts.NodeName)
if !schedulable && !settings.Excluded(opts.NodeName) {
settings.Exclude(opts.NodeName)
log(fmt.Sprintf(
"[%s] marked as unschedulable but not marked as such in elastic, updating elastic...",
"[%s] marked as unschedulable but not marked as such in elastic, updating elastic with new settings:\n%+v",
opts.NodeName,
settings,
))
settings.Exclude(opts.NodeName)
} else if schedulable && settings.Excluded(opts.NodeName) {
settings.Include(opts.NodeName)
log(fmt.Sprintf(
"[%s] marked as schedulable but not marked as such in elastic, updating elastic...",
"[%s] marked as schedulable but not marked as such in elastic, updating elastic with new settings:\n%+v",
opts.NodeName,
settings,
))
settings.Include(opts.NodeName)
} else {
return nil
}

_, err = PutAllocationSettings(client, settings)

return nil
return err
}

func bail(err error) {
Expand All @@ -128,7 +148,7 @@ func log(args ...interface{}) {
fmt.Println(args...)
}

func init() {
func setup() {
_, err := flags.ParseArgs(&opts, os.Args)
if err != nil {
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion config/crds/elasticsearch_v1beta1_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ spec:
type: array
storageClass:
type: string
unschedulable:
unschedulableIndices:
items:
format: int64
type: integer
Expand Down
2 changes: 1 addition & 1 deletion config/crds/elasticsearch_v1beta1_pool.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ spec:
type: array
storageClass:
type: string
unschedulable:
unschedulableIndices:
items:
format: int64
type: integer
Expand Down
2 changes: 1 addition & 1 deletion config/crds/elasticsearch_v1beta1_quorum.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ spec:
type: array
storageClass:
type: string
unschedulable:
unschedulableIndices:
items:
format: int64
type: integer
Expand Down
6 changes: 3 additions & 3 deletions config/samples/elasticsearch_v1beta1_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ spec:
roles:
- master
- data
replicas: 2
replicas: 3
storageClass: "standard"
resources:
requests:
memory: "256Mi"
cpu: "200m"
memory: "512Mi"
cpu: "300m"
# memory: "1Gi"
# cpu: "500m"
# - name: data
Expand Down
3 changes: 3 additions & 0 deletions hack/sample-data/shakespeare.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"settings": {
"index": {
"refresh_interval": "30s"
},
"number_of_shards": 2,
"number_of_replicas": 1
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/elasticsearch/v1beta1/pool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// PoolSpec defines the desired state of Pool
type PoolSpec struct {
Replicas int32 `json:"replicas"`
Unschedulable []int `json:"unschedulable,omitempty"`
Unschedulable []int `json:"unschedulableIndices,omitempty"`
Name string `json:"name"`
// +kubebuilder:validation:Enum=master,data,ingest
Roles []string `json:"roles,omitempty"`
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/util/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ func ToPools(
) (res, forDeletion []elasticsearchv1beta1.PoolSpec, err error) {

injectUnschedulableNodes := func(stats scheduler.PoolStats, spec *elasticsearchv1beta1.PoolSpec) {
if idx := int(stats.ScheduleReplicas); stats.LastUnschedulable &&
!spec.ContainsUnschedulable(idx) {
// pods are zero-indexed while replica requests are 1-indexed. align them.
idx := int(stats.ScheduleReplicas - 1)

if stats.LastUnschedulable && !spec.ContainsUnschedulable(idx) {
spec.Unschedulable = append(spec.Unschedulable, idx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/util/stateful_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func ReconcileStatefulSet(
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("512Mi"),
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
StorageClassName: storageClass,
Expand Down

0 comments on commit daf084f

Please sign in to comment.