From 4258634ccf2f675e00c36b6fadbd2b927901b5e2 Mon Sep 17 00:00:00 2001 From: "Tan N. Le" Date: Thu, 28 Jul 2022 19:27:53 -0700 Subject: [PATCH] Capacity report (#18) - pull capacity report via /offers endpoint. - calculate how many tasks (with resource and constraints) can be fit in the cluster. examples of using the above 2 features are in aurora-scheduler/australis#33 --- docker-compose.yml | 53 ++++- offer.go | 434 ++++++++++++++++++++++++++++++++++++++ realis.go | 6 + realis_e2e_test.go | 509 ++++++++++++++++++++++++++++++++++++++++++++- util.go | 20 ++ 5 files changed, 1012 insertions(+), 10 deletions(-) create mode 100644 offer.go diff --git a/docker-compose.yml b/docker-compose.yml index 348b85b..6fd40ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,10 +41,11 @@ services: MESOS_MASTER: zk://192.168.33.2:2181/mesos MESOS_CONTAINERIZERS: docker,mesos MESOS_PORT: 5051 - MESOS_HOSTNAME: localhost + MESOS_HOSTNAME: agent-one MESOS_RESOURCES: ports(*):[11000-11999] MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-one;rack:1;zone:west' networks: aurora_cluster: ipv4_address: 192.168.33.4 @@ -55,6 +56,56 @@ services: depends_on: - zk + agent-two: + image: quay.io/aurorascheduler/mesos-agent:1.9.0 + pid: host + restart: on-failure + ports: + - "5052:5051" + environment: + MESOS_MASTER: zk://192.168.33.2:2181/mesos + MESOS_CONTAINERIZERS: docker,mesos + MESOS_PORT: 5051 + MESOS_HOSTNAME: agent-two + MESOS_RESOURCES: ports(*):[11000-11999] + MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' + MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-two;rack:2;zone:west' + networks: + aurora_cluster: + ipv4_address: 192.168.33.5 + + volumes: + - /sys/fs/cgroup:/sys/fs/cgroup + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zk + + agent-three: + image: quay.io/aurorascheduler/mesos-agent:1.9.0 + pid: host + restart: on-failure + ports: + - "5053:5051" + environment: + MESOS_MASTER: zk://192.168.33.2:2181/mesos + MESOS_CONTAINERIZERS: docker,mesos + MESOS_PORT: 5051 + MESOS_HOSTNAME: agent-three + MESOS_RESOURCES: ports(*):[11000-11999] + MESOS_SYSTEMD_ENABLE_SUPPORT: 'false' + MESOS_WORK_DIR: /tmp/mesos + MESOS_ATTRIBUTES: 'host:agent-three;rack:2;zone:west;dedicated:vagrant/bar' + networks: + aurora_cluster: + ipv4_address: 192.168.33.6 + + volumes: + - /sys/fs/cgroup:/sys/fs/cgroup + - /var/run/docker.sock:/var/run/docker.sock + depends_on: + - zk + aurora-one: image: quay.io/aurorascheduler/scheduler:0.25.0 pid: host diff --git a/offer.go b/offer.go new file mode 100644 index 0000000..6f5346f --- /dev/null +++ b/offer.go @@ -0,0 +1,434 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package realis + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" +) + +// Offers on [aurora-scheduler]/offers endpoint +type Offer struct { + ID struct { + Value string `json:"value"` + } `json:"id"` + FrameworkID struct { + Value string `json:"value"` + } `json:"framework_id"` + AgentID struct { + Value string `json:"value"` + } `json:"agent_id"` + Hostname string `json:"hostname"` + URL struct { + Scheme string `json:"scheme"` + Address struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + Port int `json:"port"` + } `json:"address"` + Path string `json:"path"` + Query []interface{} `json:"query"` + } `json:"url"` + Resources []struct { + Name string `json:"name"` + Type string `json:"type"` + Ranges struct { + Range []struct { + Begin int `json:"begin"` + End int `json:"end"` + } `json:"range"` + } `json:"ranges,omitempty"` + Role string `json:"role"` + Reservations []interface{} `json:"reservations"` + Scalar struct { + Value float64 `json:"value"` + } `json:"scalar,omitempty"` + } `json:"resources"` + Attributes []struct { + Name string `json:"name"` + Type string `json:"type"` + Text struct { + Value string `json:"value"` + } `json:"text"` + } `json:"attributes"` + ExecutorIds []struct { + Value string `json:"value"` + } `json:"executor_ids"` +} + +// hosts on [aurora-scheduler]/maintenance endpoint +type MaintenanceList struct { + Drained []string `json:"DRAINED"` + Scheduled []string `json:"SCHEDULED"` + Draining map[string][]string `json:"DRAINING"` +} + +type OfferCount map[float64]int64 +type OfferGroupReport map[string]OfferCount +type OfferReport map[string]OfferGroupReport + +// MaintenanceHosts list all the hosts under maintenance +func (c *Client) MaintenanceHosts() ([]string, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, + } + + request := &http.Client{Transport: tr} + + resp, err := request.Get(fmt.Sprintf("%s/maintenance", c.GetSchedulerURL())) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + return nil, err + } + + var list MaintenanceList + + if err := json.Unmarshal(buf.Bytes(), &list); err != nil { + return nil, err + } + + hosts := append(list.Drained, list.Scheduled...) + + for drainingHost := range list.Draining { + hosts = append(hosts, drainingHost) + } + + return hosts, nil +} + +// Offers pulls data from /offers endpoint +func (c *Client) Offers() ([]Offer, error) { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: c.config.insecureSkipVerify}, + } + + request := &http.Client{Transport: tr} + + resp, err := request.Get(fmt.Sprintf("%s/offers", c.GetSchedulerURL())) + if err != nil { + return []Offer{}, err + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + return nil, err + } + + var offers []Offer + + if err := json.Unmarshal(buf.Bytes(), &offers); err != nil { + return []Offer{}, err + } + + return offers, nil +} + +// AvailOfferReport returns a detailed summary of offers available for use. +// For example, 2 nodes offer 32 cpus and 10 nodes offer 1 cpus. +func (c *Client) AvailOfferReport() (OfferReport, error) { + maintHosts, err := c.MaintenanceHosts() + if err != nil { + return nil, err + } + + maintHostSet := map[string]bool{} + for _, h := range maintHosts { + maintHostSet[h] = true + } + + // Get a list of offers + offers, err := c.Offers() + if err != nil { + return nil, err + } + + report := OfferReport{} + + for _, o := range offers { + if maintHostSet[o.Hostname] { + continue + } + + group := "non-dedicated" + for _, a := range o.Attributes { + if a.Name == "dedicated" { + group = a.Text.Value + break + } + } + + if _, ok := report[group]; !ok { + report[group] = map[string]OfferCount{} + } + + for _, r := range o.Resources { + + if _, ok := report[group][r.Name]; !ok { + report[group][r.Name] = OfferCount{} + } + + val := 0.0 + switch r.Type { + case "SCALAR": + val = r.Scalar.Value + case "RANGES": + for _, pr := range r.Ranges.Range { + val += float64(pr.End - pr.Begin + 1) + } + default: + return nil, fmt.Errorf("%s is not supported", r.Type) + } + + report[group][r.Name][val]++ + } + } + + return report, nil +} + +// FitTasks computes the number tasks can be fit in a list of offer +func (c *Client) FitTasks(taskConfig *aurora.TaskConfig, offers []Offer) (int64, error) { + // count the number of tasks per limit contraint: limit.name -> limit.value -> count + limitCounts := map[string]map[string]int64{} + for _, c := range taskConfig.Constraints { + if c.Constraint.Limit != nil { + limitCounts[c.Name] = map[string]int64{} + } + } + + request := ResourcesToMap(taskConfig.Resources) + + // validate resource request + if len(request) == 0 { + return -1, fmt.Errorf("Resource request %v must not be empty", request) + } + + isValid := false + for _, resVal := range request { + if resVal > 0 { + isValid = true + break + } + } + + if !isValid { + return -1, fmt.Errorf("Resource request %v is not valid", request) + } + + // pull the list of hosts under maintenance + maintHosts, err := c.MaintenanceHosts() + if err != nil { + return -1, err + } + + maintHostSet := map[string]bool{} + for _, h := range maintHosts { + maintHostSet[h] = true + } + + numTasks := int64(0) + + for _, o := range offers { + // skip the hosts under maintenance + if maintHostSet[o.Hostname] { + continue + } + + numTasksPerOffer := int64(-1) + + for resName, resVal := range request { + // skip as we can fit a infinite number of tasks with 0 demand. + if resVal == 0 { + continue + } + + avail := 0.0 + for _, r := range o.Resources { + if r.Name != resName { + continue + } + + switch r.Type { + case "SCALAR": + avail = r.Scalar.Value + case "RANGES": + for _, pr := range r.Ranges.Range { + avail += float64(pr.End - pr.Begin + 1) + } + default: + return -1, fmt.Errorf("%s is not supported", r.Type) + } + } + + numTasksPerResource := int64(avail / resVal) + + if numTasksPerResource < numTasksPerOffer || numTasksPerOffer < 0 { + numTasksPerOffer = numTasksPerResource + } + } + + numTasks += fitConstraints(taskConfig, &o, limitCounts, numTasksPerOffer) + } + + return numTasks, nil +} + +func fitConstraints(taskConfig *aurora.TaskConfig, + offer *Offer, + limitCounts map[string]map[string]int64, + numTasksPerOffer int64) int64 { + + // check dedicated attributes vs. constraints + if !isDedicated(offer, taskConfig.Job.Role, taskConfig.Constraints) { + return 0 + } + + limitConstraints := []*aurora.Constraint{} + + for _, c := range taskConfig.Constraints { + // look for corresponding attribute + attFound := false + for _, a := range offer.Attributes { + if a.Name == c.Name { + attFound = true + } + } + + // constraint not found in offer's attributes + if !attFound { + return 0 + } + + if c.Constraint.Value != nil && !valueConstraint(offer, c) { + // value constraint is not satisfied + return 0 + } else if c.Constraint.Limit != nil { + limitConstraints = append(limitConstraints, c) + limit := limitConstraint(offer, c, limitCounts) + + if numTasksPerOffer > limit && limit >= 0 { + numTasksPerOffer = limit + } + } + } + + // update limitCounts + for _, c := range limitConstraints { + for _, a := range offer.Attributes { + if a.Name == c.Name { + limitCounts[a.Name][a.Text.Value] += numTasksPerOffer + } + } + } + + return numTasksPerOffer +} + +func isDedicated(offer *Offer, role string, constraints []*aurora.Constraint) bool { + // get all dedicated attributes of an offer + dedicatedAtts := map[string]bool{} + for _, a := range offer.Attributes { + if a.Name == "dedicated" { + dedicatedAtts[a.Text.Value] = true + } + } + + if len(dedicatedAtts) == 0 { + return true + } + + // check if constraints are matching dedicated attributes + matched := false + for _, c := range constraints { + if c.Name == "dedicated" && c.Constraint.Value != nil { + found := false + + for _, v := range c.Constraint.Value.Values { + if dedicatedAtts[v] && strings.HasPrefix(v, fmt.Sprintf("%s/", role)) { + found = true + break + } + } + + if found { + matched = true + } else { + return false + } + } + } + + return matched +} + +// valueConstraint checks Value Contraints of task if the are matched by the offer. +// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ +func valueConstraint(offer *Offer, constraint *aurora.Constraint) bool { + matched := false + + for _, a := range offer.Attributes { + if a.Name == constraint.Name { + for _, v := range constraint.Constraint.Value.Values { + matched = (a.Text.Value == v && !constraint.Constraint.Value.Negated) || + (a.Text.Value != v && constraint.Constraint.Value.Negated) + + if matched { + break + } + } + + if matched { + break + } + } + } + + return matched +} + +// limitConstraint limits the number of pods on a group which has the same attribute. +// more details can be found here https://aurora.apache.org/documentation/latest/features/constraints/ +func limitConstraint(offer *Offer, constraint *aurora.Constraint, limitCounts map[string]map[string]int64) int64 { + limit := int64(-1) + for _, a := range offer.Attributes { + // limit constraint found + if a.Name == constraint.Name { + curr := limitCounts[a.Name][a.Text.Value] + currLimit := int64(constraint.Constraint.Limit.Limit) + + if curr >= currLimit { + return 0 + } + + if currLimit-curr < limit || limit < 0 { + limit = currLimit - curr + } + } + } + + return limit +} diff --git a/realis.go b/realis.go index a7c536e..9d8428f 100644 --- a/realis.go +++ b/realis.go @@ -147,6 +147,8 @@ func NewClient(options ...ClientOption) (*Client, error) { return nil, errors.New("incomplete Options -- url, cluster.json, or Zookeeper address required") } + config.url = url + url, err = validateAuroraAddress(url) if err != nil { return nil, errors.Wrap(err, "unable to create realis object, invalid url") @@ -841,3 +843,7 @@ func (c *Client) RollbackJobUpdate(key aurora.JobUpdateKey, message string) erro } return nil } + +func (c *Client) GetSchedulerURL() string { + return c.config.url +} diff --git a/realis_e2e_test.go b/realis_e2e_test.go index 55fab19..bf95f72 100644 --- a/realis_e2e_test.go +++ b/realis_e2e_test.go @@ -325,6 +325,10 @@ func TestRealisClient_GetPendingReason(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_CreateService_WithPulse_Thermos(t *testing.T) { @@ -410,6 +414,10 @@ pulseLoop: err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } // Test configuring an executor that doesn't exist for CreateJob API @@ -454,7 +462,10 @@ func TestRealisClient_CreateService(t *testing.T) { // Kill task test task after confirming it came up fine err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) assert.NoError(t, err) } @@ -513,10 +524,17 @@ func TestRealisClient_ScheduleCronJob_Thermos(t *testing.T) { t.Run("TestRealisClient_DeschedulerCronJob_Thermos", func(t *testing.T) { err := r.DescheduleCronJob(job.JobKey()) assert.NoError(t, err) + + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }) } func TestRealisClient_StartMaintenance(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} _, err := r.StartMaintenance(hosts...) assert.NoError(t, err) @@ -526,7 +544,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_SCHEDULED}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -542,7 +560,7 @@ func TestRealisClient_StartMaintenance(t *testing.T) { } func TestRealisClient_DrainHosts(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} _, err := r.DrainHosts(hosts...) assert.NoError(t, err) @@ -552,7 +570,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) t.Run("TestRealisClient_MonitorNontransitioned", func(t *testing.T) { @@ -565,7 +583,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { // Assert monitor returned an error that was not nil, and also a list of the non-transitioned hosts assert.Error(t, err) - assert.Equal(t, map[string]bool{"localhost": true, "IMAGINARY_HOST": false}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true, "IMAGINARY_HOST": false}, hostResults) }) t.Run("TestRealisClient_EndMaintenance", func(t *testing.T) { @@ -584,7 +602,7 @@ func TestRealisClient_DrainHosts(t *testing.T) { } func TestRealisClient_SLADrainHosts(t *testing.T) { - hosts := []string{"localhost"} + hosts := []string{"agent-one"} policy := aurora.SlaPolicy{PercentageSlaPolicy: &aurora.PercentageSlaPolicy{Percentage: 50.0}} _, err := r.SLADrainHosts(&policy, 30, hosts...) @@ -599,7 +617,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -624,7 +642,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -640,7 +658,7 @@ func TestRealisClient_SLADrainHosts(t *testing.T) { []aurora.MaintenanceMode{aurora.MaintenanceMode_DRAINED, aurora.MaintenanceMode_DRAINING}, 1*time.Second, 50*time.Second) - assert.Equal(t, map[string]bool{"localhost": true}, hostResults) + assert.Equal(t, map[string]bool{"agent-one": true}, hostResults) assert.NoError(t, err) _, err = r.EndMaintenance(hosts...) @@ -681,6 +699,9 @@ func TestRealisClient_SessionThreadSafety(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }() } @@ -767,6 +788,12 @@ func TestRealisClient_PartitionPolicy(t *testing.T) { assert.NoError(t, err) } + err = r.KillJob(job.JobKey()) + assert.NoError(t, err) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_UpdateStrategies(t *testing.T) { @@ -831,6 +858,10 @@ func TestRealisClient_UpdateStrategies(t *testing.T) { assert.NoError(t, r.AbortJobUpdate(key, "Monitor timed out.")) } assert.NoError(t, r.KillJob(strategy.jobUpdate.JobKey())) + + success, err := r.MonitorInstances(strategy.jobUpdate.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) }) } } @@ -877,6 +908,10 @@ func TestRealisClient_BatchAwareAutoPause(t *testing.T) { } assert.NoError(t, r.AbortJobUpdate(key, "")) assert.NoError(t, r.KillJob(strategy.JobKey())) + + success, err := r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) } func TestRealisClient_GetJobSummary(t *testing.T) { @@ -924,4 +959,460 @@ func TestRealisClient_GetJobSummary(t *testing.T) { err = r.KillJob(job.JobKey()) assert.NoError(t, err) + + success, err = r.MonitorInstances(job.JobKey(), 0, 1*time.Second, 90*time.Second) + assert.True(t, success) + assert.NoError(t, err) +} + +func TestRealisClient_Offers(t *testing.T) { + var offers []realis.Offer + + // since offers are being recycled, it take a few tries to get all of them. + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 5) +} + +func TestRealisClient_MaintenanceHosts(t *testing.T) { + offers, err := r.Offers() + assert.NoError(t, err) + + for i := 0; i < len(offers); i++ { + _, err := r.DrainHosts(offers[i].Hostname) + assert.NoError(t, err) + + hosts, err := r.MaintenanceHosts() + assert.Equal(t, i+1, len(hosts)) + } + + // clean up + for i := 0; i < len(offers); i++ { + _, err := r.EndMaintenance(offers[i].Hostname) + assert.NoError(t, err) + + // Monitor change to DRAINING and DRAINED mode + _, err = r.MonitorHostMaintenance( + []string{offers[i].Hostname}, + []aurora.MaintenanceMode{aurora.MaintenanceMode_NONE}, + 5*time.Second, + 10*time.Second) + assert.NoError(t, err) + } +} + +func TestRealisClient_AvailOfferReport(t *testing.T) { + var offers []realis.Offer + + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 3) + + capacity, err := r.AvailOfferReport() + assert.NoError(t, err) + + // 2 groups for non-dedicated & dedicated + assert.Equal(t, 2, len(capacity)) + // 4 resources: cpus, disk, mem, ports + assert.Equal(t, 4, len(capacity["non-dedicated"])) +} + +func TestRealisClient_FitTasks(t *testing.T) { + var offers []realis.Offer + + i := 0 + for ; len(offers) < 3 && i < 5; i++ { + offers, _ = r.Offers() + time.Sleep(5 * time.Second) + } + + assert.NotEqual(t, i, 5) + + cpuPerOffer := 0.0 + for _, r := range offers[0].Resources { + if r.Name == "cpus" { + cpuPerOffer = r.Scalar.Value + } + } + + // make sure all offers have no running executor + for _, o := range offers { + assert.Equal(t, o.ExecutorIds[:0], o.ExecutorIds) + } + + validCpu := cpuPerOffer / 2 + inValidCpu := cpuPerOffer + 1 + gpu := int64(1) + + tests := []struct { + message string + role string + request aurora.Resource + constraints []*aurora.Constraint + expected int64 + isError bool + }{ + { + message: "task with gpu request", + role: "vagrant", + request: aurora.Resource{ + NumGpus: &gpu, + }, + expected: 0, + isError: false, + }, + { + message: "empty resource request", + role: "vagrant", + request: aurora.Resource{}, + expected: -1, + isError: true, + }, + { + message: "valid resource request", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + expected: 4, + isError: false, + }, + { + message: "invalid cpu request", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &inValidCpu, + }, + expected: 0, + isError: false, + }, + { + message: "dedicated constraint", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "dedicated constraint with unauthorized role", + role: "unauthorized", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "value constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"west"}, + }, + }, + }, + }, + expected: 4, + isError: false, + }, + { + message: "negative value constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: true, + Values: []string{"west"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "negative value constraint on host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: true, + Values: []string{"agent-one"}, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "value constraint on unavailable zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"east"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "value constraint on unavailable attribute", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "os", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"windows"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "1 value constraint with 2 values", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"agent-one", "agent-two"}, + }, + }, + }, + }, + expected: 4, + isError: false, + }, + { + message: "2 value constraints", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"agent-one"}, + }, + }, + }, + { + Name: "rack", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"2"}, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "limit constraint on host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 2, + isError: false, + }, + { + message: "limit constraint on zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + { + message: "limit constraint on zone & host", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + { + Name: "zone", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + { + message: "limit constraint on unavailable zone", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "gpu-host", // no host has gpu-host attribute + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 0, + isError: false, + }, + { + message: "limit & dedicated constraint", + role: "vagrant", + request: aurora.Resource{ + NumCpus: &validCpu, + }, + constraints: []*aurora.Constraint{ + { + Name: "dedicated", + Constraint: &aurora.TaskConstraint{ + Value: &aurora.ValueConstraint{ + Negated: false, + Values: []string{"vagrant/bar"}, + }, + }, + }, + { + Name: "host", + Constraint: &aurora.TaskConstraint{ + Limit: &aurora.LimitConstraint{ + Limit: 1, + }, + }, + }, + }, + expected: 1, + isError: false, + }, + } + + for _, tc := range tests { + task := aurora.NewTaskConfig() + task.Resources = []*aurora.Resource{&tc.request} + task.Constraints = tc.constraints + task.Job = &aurora.JobKey{ + Role: tc.role, + } + + numTasks, err := r.FitTasks(task, offers) + + if !tc.isError { + assert.NoError(t, err) + assert.Equal(t, tc.expected, numTasks, tc.message) + } else { + assert.Error(t, err) + } + } } diff --git a/util.go b/util.go index 90e3dcf..f993aaa 100644 --- a/util.go +++ b/util.go @@ -104,3 +104,23 @@ func calculateCurrentBatch(updatingInstances int32, batchSizes []int32) int { } return batchCount } + +func ResourcesToMap(resources []*aurora.Resource) map[string]float64 { + result := map[string]float64{} + + for _, resource := range resources { + if resource.NumCpus != nil { + result["cpus"] += *resource.NumCpus + } else if resource.RamMb != nil { + result["mem"] += float64(*resource.RamMb) + } else if resource.DiskMb != nil { + result["disk"] += float64(*resource.DiskMb) + } else if resource.NamedPort != nil { + result["ports"]++ + } else if resource.NumGpus != nil { + result["gpus"] += float64(*resource.NumGpus) + } + } + + return result +}