Skip to content

Commit

Permalink
yarn-operator: add ut for controller (koordinator-sh#45)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <[email protected]>
  • Loading branch information
zwzhang0107 authored Oct 26, 2023
1 parent 8e3b56e commit d347d17
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 58 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ require (
require (
github.com/gin-gonic/gin v1.8.1
github.com/go-resty/resty/v2 v2.7.0
github.com/golangplus/testing v1.0.0
github.com/opencontainers/runc v1.1.6
github.com/stretchr/testify v1.8.2
)

require (
Expand Down Expand Up @@ -114,7 +114,6 @@ require (
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/golang-jwt/jwt/v4 v4.2.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/golangplus/bytes v1.0.0 // indirect
github.com/google/cadvisor v0.44.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
Expand Down Expand Up @@ -146,7 +145,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/cobra v1.6.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.2 // indirect
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangplus/bytes v0.0.0-20160111154220-45c989fe5450/go.mod h1:Bk6SMAONeMXrxql8uvOKuAZSu8aM5RUGv+1C6IJaEho=
github.com/golangplus/bytes v1.0.0 h1:YQKBijBVMsBxIiXT4IEhlKR2zHohjEqPole4umyDX+c=
github.com/golangplus/bytes v1.0.0/go.mod h1:AdRaCFwmc/00ZzELMWb01soso6W1R/++O1XL80yAn+A=
github.com/golangplus/fmt v1.0.0 h1:FnUKtw86lXIPfBMc3FimNF3+ABcV+aH5F17OOitTN+E=
github.com/golangplus/fmt v1.0.0/go.mod h1:zpM0OfbMCjPtd2qkTD/jX2MgiFCqklhSUFyDW44gVQE=
github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk=
github.com/golangplus/testing v1.0.0 h1:+ZeeiKZENNOMkTTELoSySazi+XaEhVO0mb+eanrSEUQ=
github.com/golangplus/testing v1.0.0/go.mod h1:ZDreixUV3YzhoVraIDyOzHrr76p6NUh6k/pPg/Q3gYA=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
Expand Down
24 changes: 24 additions & 0 deletions pkg/controller/metrics/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

const (
yarnNodeCPUResource = "yarn_node_cpu_resource"
yarnNodeMemoryResource = "yarn_node_memory_resource"
yarnNodeCPUAllocatedResource = "yarn_node_cpu_allocated_resource"
yarnNodeMemoryAllocatedResource = "yarn_node_memory_allocated_resource"
)
7 changes: 0 additions & 7 deletions pkg/controller/metrics/yarn_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ import (
"github.com/koordinator-sh/goyarn/pkg/yarn/cache"
)

const (
yarnNodeCPUResource = "yarn_node_cpu_resource"
yarnNodeMemoryResource = "yarn_node_memory_resource"
yarnNodeCPUAllocatedResource = "yarn_node_cpu_allocated_resource"
yarnNodeMemoryAllocatedResource = "yarn_node_memory_allocated_resource"
)

var (
yarnNodeCPUMetric = prometheus.NewDesc(
yarnNodeCPUResource,
Expand Down
65 changes: 28 additions & 37 deletions pkg/controller/noderesource/resource_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil
}
if yarnNode == nil || yarnNode.Name == "" || yarnNode.Port == 0 {
klog.V(3).Infof("yarn node not exist on node %v, clear yarn allocated resource, detail %+v", req.Name, yarnNode)
if err := r.updateYarnAllocatedResource(node, 0, 0); err != nil {
if err := r.updateYARNAllocatedResource(node, 0, 0); err != nil {
klog.Warningf("failed to clear yarn allocated resource for node %v", req.Name)
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{}, nil
}

// TODO exclude batch pod requested
batchCPU, batchMemory, err := r.GetNodeBatchResource(node)
batchCPU, batchMemory, err := getNodeBatchResource(node)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -100,13 +100,17 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil
if err != nil {
return reconcile.Result{}, err
}
if err := r.updateYarnAllocatedResource(node, core, mb); err != nil {
if err := r.updateYARNAllocatedResource(node, core, mb); err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{}, nil
}

func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchMemory resource.Quantity, err error) {
func getNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchMemory resource.Quantity, err error) {
if node == nil {
return
}

batchCPU, cpuExist := node.Status.Allocatable[BatchCPU]
batchMemory, memExist := node.Status.Allocatable[BatchMemory]
if !cpuExist {
Expand All @@ -115,18 +119,18 @@ func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (ba
if !memExist {
batchMemory = *resource.NewQuantity(0, resource.BinarySI)
}
if node.Annotations == nil || len(node.Annotations[nodeOriginAllocatableAnnotationKey]) == 0 {
if node.Annotations == nil || len(node.Annotations[NodeOriginAllocatableAnnotationKey]) == 0 {
// koordiantor <= 1.3.0, use node status as origin batch total
return
}

var originAllocatable corev1.ResourceList
err = json.Unmarshal([]byte(node.Annotations[nodeOriginAllocatableAnnotationKey]), &originAllocatable)
originAllocatable, err := GetOriginExtendAllocatable(node.Annotations)
if err != nil {
klog.Warningf("get origin allocatable from node %v annotation failed, error: %v", node.Name, err)
return
}
batchCPU, cpuExist = originAllocatable[BatchCPU]
batchMemory, memExist = originAllocatable[BatchMemory]
batchCPU, cpuExist = originAllocatable.Resources[BatchCPU]
batchMemory, memExist = originAllocatable.Resources[BatchMemory]
if !cpuExist {
batchCPU = *resource.NewQuantity(0, resource.DecimalSI)
}
Expand All @@ -136,31 +140,23 @@ func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (ba
return
}

type ResourceInfo map[string]resource.Quantity

type AllocatedResource map[string]*ResourceInfo

func (r *YARNResourceSyncReconciler) updateYarnAllocatedResource(node *corev1.Node, vcores int32, memoryMB int64) error {
allocatedResource, err := r.GetAllocatedResource(node)
if err != nil {
return err
func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.Node, vcores int32, memoryMB int64) error {
if node == nil {
return nil
}
cpu := *resource.NewQuantity(int64(vcores), resource.DecimalSI)
memory := *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI)
allocatedResource["yarnAllocated"] = &ResourceInfo{
string(BatchCPU): cpu,
string(BatchMemory): memory,
nodeAllocated := &NodeAllocated{
YARNAllocated: map[corev1.ResourceName]resource.Quantity{
BatchCPU: *resource.NewQuantity(int64(vcores), resource.DecimalSI),
BatchMemory: *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI),
},
}
return r.SetAllocatedResource(node, allocatedResource)
}

func (r *YARNResourceSyncReconciler) SetAllocatedResource(node *corev1.Node, resource AllocatedResource) error {
marshal, err := json.Marshal(resource)
nodeAllocatedStr, err := json.Marshal(nodeAllocated)
if err != nil {
return err
}
newNode := node.DeepCopy()
newNode.Annotations[yarnNodeAllocatedResourceAnnotationKey] = string(marshal)
newNode.Annotations[NodeAllocatedResourceAnnotationKey] = string(nodeAllocatedStr)
oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
Expand All @@ -178,14 +174,6 @@ func (r *YARNResourceSyncReconciler) SetAllocatedResource(node *corev1.Node, res
return r.Client.Patch(context.TODO(), newNode, client.RawPatch(types.StrategicMergePatchType, patchBytes))
}

func (r *YARNResourceSyncReconciler) GetAllocatedResource(node *corev1.Node) (AllocatedResource, error) {
if node.GetAnnotations() == nil || node.GetAnnotations()[yarnNodeAllocatedResourceAnnotationKey] == "" {
return map[string]*ResourceInfo{}, nil
}
var res map[string]*ResourceInfo
return res, json.Unmarshal([]byte(node.GetAnnotations()[yarnNodeAllocatedResourceAnnotationKey]), &res)
}

func Add(mgr ctrl.Manager) error {
clients, err := yarnclient.GetAllKnownClients()
if err != nil {
Expand Down Expand Up @@ -213,6 +201,9 @@ func (r *YARNResourceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *YARNResourceSyncReconciler) getYARNNodeManagerPod(node *corev1.Node) (*corev1.Pod, error) {
if node == nil {
return nil, nil
}
opts := []client.ListOption{
client.MatchingLabels{YarnNMComponentLabel: YarnNMComponentValue},
client.MatchingFields{"spec.nodeName": node.Name},
Expand All @@ -225,8 +216,8 @@ func (r *YARNResourceSyncReconciler) getYARNNodeManagerPod(node *corev1.Node) (*
return nil, nil
}
if len(podList.Items) > 1 {
klog.Warningf("get %v node manager pod on node %v, will select the first one", len(podList.Items), node.Name)
return &podList.Items[0], nil
klog.Warningf("get %v node manager pod on node %v, will select the first one %v/%v", len(podList.Items),
node.Name, podList.Items[0].Namespace, podList.Items[0].Name)
}
return &podList.Items[0], nil
}
Expand Down
Loading

0 comments on commit d347d17

Please sign in to comment.