diff --git a/go.mod b/go.mod index 8c8d507e..26dfae83 100644 --- a/go.mod +++ b/go.mod @@ -66,8 +66,8 @@ require ( require ( github.com/gin-gonic/gin v1.8.1 github.com/go-resty/resty/v2 v2.7.0 - github.com/golangplus/testing v1.0.0 github.com/opencontainers/runc v1.1.6 + github.com/stretchr/testify v1.8.2 ) require ( @@ -114,7 +114,6 @@ require ( github.com/godbus/dbus/v5 v5.0.6 // indirect github.com/golang-jwt/jwt/v4 v4.2.0 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/golangplus/bytes v1.0.0 // indirect github.com/google/cadvisor v0.44.1 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect @@ -146,7 +145,6 @@ require ( github.com/sirupsen/logrus v1.8.1 // indirect github.com/spf13/cobra v1.6.1 // indirect github.com/stretchr/objx v0.5.0 // indirect - github.com/stretchr/testify v1.8.2 // indirect github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect github.com/ugorji/go/codec v1.2.7 // indirect github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect diff --git a/go.sum b/go.sum index 4b197671..e9842fc3 100644 --- a/go.sum +++ b/go.sum @@ -398,14 +398,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golangplus/bytes v0.0.0-20160111154220-45c989fe5450/go.mod h1:Bk6SMAONeMXrxql8uvOKuAZSu8aM5RUGv+1C6IJaEho= -github.com/golangplus/bytes v1.0.0 h1:YQKBijBVMsBxIiXT4IEhlKR2zHohjEqPole4umyDX+c= -github.com/golangplus/bytes v1.0.0/go.mod h1:AdRaCFwmc/00ZzELMWb01soso6W1R/++O1XL80yAn+A= -github.com/golangplus/fmt v1.0.0 h1:FnUKtw86lXIPfBMc3FimNF3+ABcV+aH5F17OOitTN+E= -github.com/golangplus/fmt v1.0.0/go.mod h1:zpM0OfbMCjPtd2qkTD/jX2MgiFCqklhSUFyDW44gVQE= github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= -github.com/golangplus/testing v1.0.0 h1:+ZeeiKZENNOMkTTELoSySazi+XaEhVO0mb+eanrSEUQ= -github.com/golangplus/testing v1.0.0/go.mod h1:ZDreixUV3YzhoVraIDyOzHrr76p6NUh6k/pPg/Q3gYA= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= diff --git a/pkg/controller/metrics/names.go b/pkg/controller/metrics/names.go new file mode 100644 index 00000000..04f8c575 --- /dev/null +++ b/pkg/controller/metrics/names.go @@ -0,0 +1,24 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +const ( + yarnNodeCPUResource = "yarn_node_cpu_resource" + yarnNodeMemoryResource = "yarn_node_memory_resource" + yarnNodeCPUAllocatedResource = "yarn_node_cpu_allocated_resource" + yarnNodeMemoryAllocatedResource = "yarn_node_memory_allocated_resource" +) diff --git a/pkg/controller/metrics/yarn_collector.go b/pkg/controller/metrics/yarn_collector.go index ad15046b..53b61f06 100644 --- a/pkg/controller/metrics/yarn_collector.go +++ b/pkg/controller/metrics/yarn_collector.go @@ -22,13 +22,6 @@ import ( "github.com/koordinator-sh/goyarn/pkg/yarn/cache" ) -const ( - yarnNodeCPUResource = "yarn_node_cpu_resource" - yarnNodeMemoryResource = "yarn_node_memory_resource" - yarnNodeCPUAllocatedResource = "yarn_node_cpu_allocated_resource" - yarnNodeMemoryAllocatedResource = "yarn_node_memory_allocated_resource" -) - var ( yarnNodeCPUMetric = prometheus.NewDesc( yarnNodeCPUResource, diff --git a/pkg/controller/noderesource/resource_sync_controller.go b/pkg/controller/noderesource/resource_sync_controller.go index 00265b58..24b4302a 100644 --- a/pkg/controller/noderesource/resource_sync_controller.go +++ b/pkg/controller/noderesource/resource_sync_controller.go @@ -72,7 +72,7 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil } if yarnNode == nil || yarnNode.Name == "" || yarnNode.Port == 0 { klog.V(3).Infof("yarn node not exist on node %v, clear yarn allocated resource, detail %+v", req.Name, yarnNode) - if err := r.updateYarnAllocatedResource(node, 0, 0); err != nil { + if err := r.updateYARNAllocatedResource(node, 0, 0); err != nil { klog.Warningf("failed to clear yarn allocated resource for node %v", req.Name) return ctrl.Result{Requeue: true}, err } @@ -80,7 +80,7 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil } // TODO exclude batch pod requested - batchCPU, batchMemory, err := r.GetNodeBatchResource(node) + batchCPU, batchMemory, err := getNodeBatchResource(node) if err != nil { return ctrl.Result{}, err } @@ -100,13 +100,17 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil if err != nil { return reconcile.Result{}, err } - if err := r.updateYarnAllocatedResource(node, core, mb); err != nil { + if err := r.updateYARNAllocatedResource(node, core, mb); err != nil { return reconcile.Result{}, err } return ctrl.Result{}, nil } -func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchMemory resource.Quantity, err error) { +func getNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchMemory resource.Quantity, err error) { + if node == nil { + return + } + batchCPU, cpuExist := node.Status.Allocatable[BatchCPU] batchMemory, memExist := node.Status.Allocatable[BatchMemory] if !cpuExist { @@ -115,18 +119,18 @@ func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (ba if !memExist { batchMemory = *resource.NewQuantity(0, resource.BinarySI) } - if node.Annotations == nil || len(node.Annotations[nodeOriginAllocatableAnnotationKey]) == 0 { + if node.Annotations == nil || len(node.Annotations[NodeOriginAllocatableAnnotationKey]) == 0 { // koordiantor <= 1.3.0, use node status as origin batch total return } - var originAllocatable corev1.ResourceList - err = json.Unmarshal([]byte(node.Annotations[nodeOriginAllocatableAnnotationKey]), &originAllocatable) + originAllocatable, err := GetOriginExtendAllocatable(node.Annotations) if err != nil { + klog.Warningf("get origin allocatable from node %v annotation failed, error: %v", node.Name, err) return } - batchCPU, cpuExist = originAllocatable[BatchCPU] - batchMemory, memExist = originAllocatable[BatchMemory] + batchCPU, cpuExist = originAllocatable.Resources[BatchCPU] + batchMemory, memExist = originAllocatable.Resources[BatchMemory] if !cpuExist { batchCPU = *resource.NewQuantity(0, resource.DecimalSI) } @@ -136,31 +140,23 @@ func (r *YARNResourceSyncReconciler) GetNodeBatchResource(node *corev1.Node) (ba return } -type ResourceInfo map[string]resource.Quantity - -type AllocatedResource map[string]*ResourceInfo - -func (r *YARNResourceSyncReconciler) updateYarnAllocatedResource(node *corev1.Node, vcores int32, memoryMB int64) error { - allocatedResource, err := r.GetAllocatedResource(node) - if err != nil { - return err +func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.Node, vcores int32, memoryMB int64) error { + if node == nil { + return nil } - cpu := *resource.NewQuantity(int64(vcores), resource.DecimalSI) - memory := *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI) - allocatedResource["yarnAllocated"] = &ResourceInfo{ - string(BatchCPU): cpu, - string(BatchMemory): memory, + nodeAllocated := &NodeAllocated{ + YARNAllocated: map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(int64(vcores), resource.DecimalSI), + BatchMemory: *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI), + }, } - return r.SetAllocatedResource(node, allocatedResource) -} -func (r *YARNResourceSyncReconciler) SetAllocatedResource(node *corev1.Node, resource AllocatedResource) error { - marshal, err := json.Marshal(resource) + nodeAllocatedStr, err := json.Marshal(nodeAllocated) if err != nil { return err } newNode := node.DeepCopy() - newNode.Annotations[yarnNodeAllocatedResourceAnnotationKey] = string(marshal) + newNode.Annotations[NodeAllocatedResourceAnnotationKey] = string(nodeAllocatedStr) oldData, err := json.Marshal(node) if err != nil { return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err) @@ -178,14 +174,6 @@ func (r *YARNResourceSyncReconciler) SetAllocatedResource(node *corev1.Node, res return r.Client.Patch(context.TODO(), newNode, client.RawPatch(types.StrategicMergePatchType, patchBytes)) } -func (r *YARNResourceSyncReconciler) GetAllocatedResource(node *corev1.Node) (AllocatedResource, error) { - if node.GetAnnotations() == nil || node.GetAnnotations()[yarnNodeAllocatedResourceAnnotationKey] == "" { - return map[string]*ResourceInfo{}, nil - } - var res map[string]*ResourceInfo - return res, json.Unmarshal([]byte(node.GetAnnotations()[yarnNodeAllocatedResourceAnnotationKey]), &res) -} - func Add(mgr ctrl.Manager) error { clients, err := yarnclient.GetAllKnownClients() if err != nil { @@ -213,6 +201,9 @@ func (r *YARNResourceSyncReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *YARNResourceSyncReconciler) getYARNNodeManagerPod(node *corev1.Node) (*corev1.Pod, error) { + if node == nil { + return nil, nil + } opts := []client.ListOption{ client.MatchingLabels{YarnNMComponentLabel: YarnNMComponentValue}, client.MatchingFields{"spec.nodeName": node.Name}, @@ -225,8 +216,8 @@ func (r *YARNResourceSyncReconciler) getYARNNodeManagerPod(node *corev1.Node) (* return nil, nil } if len(podList.Items) > 1 { - klog.Warningf("get %v node manager pod on node %v, will select the first one", len(podList.Items), node.Name) - return &podList.Items[0], nil + klog.Warningf("get %v node manager pod on node %v, will select the first one %v/%v", len(podList.Items), + node.Name, podList.Items[0].Namespace, podList.Items[0].Name) } return &podList.Items[0], nil } diff --git a/pkg/controller/noderesource/resource_sync_controller_test.go b/pkg/controller/noderesource/resource_sync_controller_test.go index aa8d7177..228c687d 100644 --- a/pkg/controller/noderesource/resource_sync_controller_test.go +++ b/pkg/controller/noderesource/resource_sync_controller_test.go @@ -18,13 +18,17 @@ package noderesource import ( "context" + "encoding/json" "reflect" "testing" - "github.com/golangplus/testing/assert" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/koordinator-sh/goyarn/pkg/yarn/cache" @@ -101,3 +105,423 @@ func TestYARNResourceSyncReconciler_getYARNNode(t *testing.T) { }) } } + +func Test_getNodeBatchResource(t *testing.T) { + type args struct { + node *corev1.Node + originAllocatable *OriginAllocatable + } + tests := []struct { + name string + args args + wantBatchCPU resource.Quantity + wantBatchMemory resource.Quantity + wantErr bool + }{ + { + name: "get from node status", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{}, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + }, + }, + wantBatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + wantBatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + wantErr: false, + }, + { + name: "get from node annotation", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + Status: corev1.NodeStatus{}, + }, + originAllocatable: &OriginAllocatable{ + map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + }, + wantBatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + wantBatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + wantErr: false, + }, + { + name: "get from node annotation even node status exist", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(2000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(20484, resource.BinarySI), + }, + }, + }, + originAllocatable: &OriginAllocatable{ + map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + }, + }, + }, + wantBatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + wantBatchMemory: *resource.NewQuantity(1024, resource.BinarySI), + wantErr: false, + }, + { + name: "return zero with nil node", + args: args{ + node: nil, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.args.originAllocatable != nil && tt.args.node != nil && tt.args.node.Annotations != nil { + originAllocatableStr, err := json.Marshal(tt.args.originAllocatable) + assert.NoError(t, err) + tt.args.node.Annotations[NodeOriginAllocatableAnnotationKey] = string(originAllocatableStr) + } + gotBatchCPU, gotBatchMemory, err := getNodeBatchResource(tt.args.node) + assert.Equal(t, tt.wantErr, err != nil) + assert.Equal(t, tt.wantBatchCPU.MilliValue(), gotBatchCPU.MilliValue()) + assert.Equal(t, tt.wantBatchMemory.MilliValue(), gotBatchMemory.MilliValue()) + }) + } +} + +func TestYARNResourceSyncReconciler_updateYARNAllocatedResource(t *testing.T) { + type args struct { + node *corev1.Node + vcores int32 + memoryMB int64 + } + tests := []struct { + name string + args args + wantAllocated *NodeAllocated + wantErr bool + }{ + { + name: "nil node do nothing", + args: args{}, + wantAllocated: nil, + wantErr: false, + }, + { + name: "update node allocated", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + Annotations: map[string]string{}, + }, + }, + vcores: 1, + memoryMB: 1024, + }, + wantAllocated: &NodeAllocated{ + YARNAllocated: map[corev1.ResourceName]resource.Quantity{ + BatchCPU: resource.MustParse("1"), + BatchMemory: resource.MustParse("1Gi"), + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + assert.NoError(t, clientgoscheme.AddToScheme(scheme)) + client := fake.NewClientBuilder().WithScheme(scheme).Build() + r := &YARNResourceSyncReconciler{ + Client: client, + } + if tt.args.node != nil { + assert.NoError(t, r.Client.Create(context.TODO(), tt.args.node)) + } + err := r.updateYARNAllocatedResource(tt.args.node, tt.args.vcores, tt.args.memoryMB) + assert.Equal(t, err != nil, tt.wantErr) + if tt.args.node != nil { + gotNode := &corev1.Node{} + key := types.NamespacedName{Name: tt.args.node.Name} + assert.NoError(t, r.Client.Get(context.TODO(), key, gotNode)) + nodeAllocated, gotErr := GetNodeAllocated(gotNode.Annotations) + assert.Equal(t, tt.wantAllocated, nodeAllocated) + assert.NoError(t, gotErr) + } + }) + } +} + +func TestYARNResourceSyncReconciler_getYARNNodeManagerPod(t *testing.T) { + type args struct { + node *corev1.Node + pods []*corev1.Pod + } + tests := []struct { + name string + args args + want *corev1.Pod + wantErr bool + }{ + { + name: "nil node with empty return", + args: args{ + node: nil, + pods: nil, + }, + want: nil, + wantErr: false, + }, + { + name: "get node manager pod", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + ResourceVersion: "1", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + wantErr: false, + }, + { + name: "node manager node found because of node label", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{}, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: nil, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + assert.NoError(t, clientgoscheme.AddToScheme(scheme)) + client := fake.NewClientBuilder().WithScheme(scheme).Build() + r := &YARNResourceSyncReconciler{ + Client: client, + } + if tt.args.node != nil { + assert.NoError(t, r.Client.Create(context.TODO(), tt.args.node)) + } + for _, pod := range tt.args.pods { + assert.NoError(t, r.Client.Create(context.TODO(), pod)) + } + got, err := r.getYARNNodeManagerPod(tt.args.node) + assert.Equal(t, tt.wantErr, err != nil) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { + type args struct { + node *corev1.Node + pods []*corev1.Pod + } + tests := []struct { + name string + args args + want *cache.YarnNode + wantErr bool + }{ + { + name: "nil node return empty", + args: args{ + node: nil, + pods: nil, + }, + want: nil, + wantErr: false, + }, + { + name: "get yarn node", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node-id:8042", + YarnClusterIDAnnotation: "test-yarn-cluster-id", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: &cache.YarnNode{ + Name: "test-yarn-node-id", + Port: 8042, + ClusterID: "test-yarn-cluster-id", + }, + wantErr: false, + }, + { + name: "yarn node id not exist", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "yarn node id bad format", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node-id-8042", + YarnClusterIDAnnotation: "test-yarn-cluster-id", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: nil, + wantErr: true, + }, + { + name: "yarn node id port bad format", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node-id:bad-port", + YarnClusterIDAnnotation: "test-yarn-cluster-id", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + assert.NoError(t, clientgoscheme.AddToScheme(scheme)) + client := fake.NewClientBuilder().WithScheme(scheme).Build() + r := &YARNResourceSyncReconciler{ + Client: client, + } + if tt.args.node != nil { + assert.NoError(t, r.Client.Create(context.TODO(), tt.args.node)) + } + for _, pod := range tt.args.pods { + assert.NoError(t, r.Client.Create(context.TODO(), pod)) + } + got, err := r.getYARNNode(tt.args.node) + assert.Equal(t, tt.wantErr, err != nil) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/controller/noderesource/yarn_resource.go b/pkg/controller/noderesource/yarn_resource.go index 7f0dc903..53d444bb 100644 --- a/pkg/controller/noderesource/yarn_resource.go +++ b/pkg/controller/noderesource/yarn_resource.go @@ -19,8 +19,12 @@ limitations under the License. package noderesource import ( - "github.com/koordinator-sh/koordinator/apis/extension" + "encoding/json" + + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + + "github.com/koordinator-sh/koordinator/apis/extension" ) const ( @@ -30,11 +34,44 @@ const ( YarnClusterIDAnnotation = "yarn.hadoop.apache.org/cluster-id" // TODO mv to koordinator/api - yarnNodeAllocatedResourceAnnotationKey = "node.yarn.koordinator.sh/resourceAllocated" - nodeOriginAllocatableAnnotationKey = "node.koordinator.sh/originAllocatable" + NodeOriginAllocatableAnnotationKey = "node.koordinator.sh/originAllocatable" + NodeAllocatedResourceAnnotationKey = "node.koordinator.sh/resourceAllocated" ) func calculate(batchCPU resource.Quantity, batchMemory resource.Quantity) (int64, int64) { // TODO multiple ratio as buffer return batchCPU.ScaledValue(resource.Kilo), batchMemory.ScaledValue(resource.Mega) } + +// TODO mv to koordiantor api +type OriginAllocatable struct { + Resources corev1.ResourceList `json:"resources,omitempty"` +} + +func GetOriginExtendAllocatable(annotations map[string]string) (*OriginAllocatable, error) { + originAllocatableStr, exist := annotations[NodeOriginAllocatableAnnotationKey] + if !exist { + return nil, nil + } + originAllocatable := &OriginAllocatable{} + if err := json.Unmarshal([]byte(originAllocatableStr), originAllocatable); err != nil { + return nil, err + } + return originAllocatable, nil +} + +type NodeAllocated struct { + YARNAllocated corev1.ResourceList `json:"yarnAllocated,omitempty"` +} + +func GetNodeAllocated(annotations map[string]string) (*NodeAllocated, error) { + nodeAllocatedStr, exist := annotations[NodeAllocatedResourceAnnotationKey] + if !exist { + return nil, nil + } + nodeAllocated := &NodeAllocated{} + if err := json.Unmarshal([]byte(nodeAllocatedStr), nodeAllocated); err != nil { + return nil, err + } + return nodeAllocated, nil +}