From ff03eb40a5c710f00be80e56dec59af9c18a3260 Mon Sep 17 00:00:00 2001 From: zwzhang Date: Tue, 14 Nov 2023 20:11:44 +0800 Subject: [PATCH] yarn-operator: fix get cluster node failed when one of rm failed (#46) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 佑祎 --- .licenseignore | 4 +- go.mod | 3 +- hack/mock-gen.sh | 31 ++ .../noderesource/resource_sync_controller.go | 55 +-- .../resource_sync_controller_test.go | 363 ++++++++++++++++-- pkg/controller/noderesource/yarn_resource.go | 152 +++++++- .../noderesource/yarn_resource_test.go | 136 +++++++ .../apis/service/applicationclient_service.go | 16 + pkg/yarn/apis/service/ha_service.go | 16 + .../resourcemanager_administration_service.go | 20 +- pkg/yarn/cache/nodes_syncer.go | 13 +- pkg/yarn/client/client.go | 57 ++- .../rm-get-cluster-nodes-with-ha/main.go | 4 +- .../rm-update-node-resource-with-ha/main.go | 4 +- pkg/yarn/client/factory.go | 93 +++++ pkg/yarn/client/mockclient/mock_client.go | 123 ++++++ pkg/yarn/client/mockclient/mock_factory.go | 97 +++++ pkg/yarn/client/utils.go | 62 --- 18 files changed, 1064 insertions(+), 185 deletions(-) create mode 100755 hack/mock-gen.sh create mode 100644 pkg/controller/noderesource/yarn_resource_test.go create mode 100644 pkg/yarn/client/factory.go create mode 100644 pkg/yarn/client/mockclient/mock_client.go create mode 100644 pkg/yarn/client/mockclient/mock_factory.go delete mode 100644 pkg/yarn/client/utils.go diff --git a/.licenseignore b/.licenseignore index 885db484..6e1b402c 100644 --- a/.licenseignore +++ b/.licenseignore @@ -1,4 +1,6 @@ vendor -pkg/yarn/apis +pkg/yarn/apis/auth +pkg/yarn/apis/proto +pkg/yarn/apis/security pkg/yarn/client/ipc pkg/yarn/config diff --git a/go.mod b/go.mod index 26dfae83..eb70be13 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.24.2 // indirect - k8s.io/apiserver v0.26.0 // indirect + k8s.io/apiserver v0.26.0 k8s.io/component-base v0.26.0 // indirect k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect sigs.k8s.io/scheduler-plugins v0.24.15 // indirect @@ -66,6 +66,7 @@ require ( require ( github.com/gin-gonic/gin v1.8.1 github.com/go-resty/resty/v2 v2.7.0 + github.com/golang/mock v1.6.0 github.com/opencontainers/runc v1.1.6 github.com/stretchr/testify v1.8.2 ) diff --git a/hack/mock-gen.sh b/hack/mock-gen.sh new file mode 100755 index 00000000..34dfee5b --- /dev/null +++ b/hack/mock-gen.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Copyright 2022-2023 The Koordinator Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -e + +SHELL_FOLDER=$(cd "$(dirname "$0")";pwd) +LICENSE_HEADER_PATH="./hack/boilerplate/boilerplate.go.txt" + +cd $GOPATH/src/github.com/koordinator-sh/goyarn + +# generates gomock files +mockgen -source pkg/yarn/client/factory.go \ + -destination pkg/yarn/client/mockclient/mock_factory.go \ + -copyright_file ${LICENSE_HEADER_PATH} +mockgen -source pkg/yarn/client/client.go \ + -destination pkg/yarn/client/mockclient/mock_client.go \ + -copyright_file ${LICENSE_HEADER_PATH} diff --git a/pkg/controller/noderesource/resource_sync_controller.go b/pkg/controller/noderesource/resource_sync_controller.go index 24b4302a..ede8c4bf 100644 --- a/pkg/controller/noderesource/resource_sync_controller.go +++ b/pkg/controller/noderesource/resource_sync_controller.go @@ -48,8 +48,8 @@ const ( type YARNResourceSyncReconciler struct { client.Client - yarnClient *yarnclient.YarnClient - yarnClients map[string]*yarnclient.YarnClient + yarnClient yarnclient.YarnClient + yarnClients map[string]yarnclient.YarnClient yarnNodeCache *cache.NodesSyncer } @@ -119,18 +119,16 @@ func getNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchM if !memExist { batchMemory = *resource.NewQuantity(0, resource.BinarySI) } - if node.Annotations == nil || len(node.Annotations[NodeOriginAllocatableAnnotationKey]) == 0 { + originAllocatableRes, err := GetOriginExtendedAllocatableRes(node.Annotations) + if err == nil && originAllocatableRes == nil { // koordiantor <= 1.3.0, use node status as origin batch total return - } - - originAllocatable, err := GetOriginExtendAllocatable(node.Annotations) - if err != nil { + } else if err != nil { klog.Warningf("get origin allocatable from node %v annotation failed, error: %v", node.Name, err) return } - batchCPU, cpuExist = originAllocatable.Resources[BatchCPU] - batchMemory, memExist = originAllocatable.Resources[BatchMemory] + batchCPU, cpuExist = originAllocatableRes[BatchCPU] + batchMemory, memExist = originAllocatableRes[BatchMemory] if !cpuExist { batchCPU = *resource.NewQuantity(0, resource.DecimalSI) } @@ -144,19 +142,11 @@ func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.No if node == nil { return nil } - nodeAllocated := &NodeAllocated{ - YARNAllocated: map[corev1.ResourceName]resource.Quantity{ - BatchCPU: *resource.NewQuantity(int64(vcores), resource.DecimalSI), - BatchMemory: *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI), - }, - } - - nodeAllocatedStr, err := json.Marshal(nodeAllocated) - if err != nil { + newNode := node.DeepCopy() + if err := SetYARNAllocatedResource(newNode.Annotations, vcores, memoryMB); err != nil { return err } - newNode := node.DeepCopy() - newNode.Annotations[NodeAllocatedResourceAnnotationKey] = string(nodeAllocatedStr) + oldData, err := json.Marshal(node) if err != nil { return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err) @@ -175,7 +165,7 @@ func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.No } func Add(mgr ctrl.Manager) error { - clients, err := yarnclient.GetAllKnownClients() + clients, err := yarnclient.DefaultYarnClientFactory.CreateAllYarnClients() if err != nil { return err } @@ -251,13 +241,16 @@ func (r *YARNResourceSyncReconciler) getYARNNode(node *corev1.Node) (*cache.Yarn Name: tokens[0], Port: int32(port), } - if clusterID, exist := nmPod.Annotations[YarnClusterIDAnnotation]; exist { + if clusterID, exist := nmPod.Annotations[PodYarnClusterIDAnnotationKey]; exist { yarnNode.ClusterID = clusterID } return yarnNode, nil } func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.YarnNode, vcores, memoryMB int64) error { + if yarnNode == nil { + return nil + } request := &yarnserver.UpdateNodeResourceRequestProto{ NodeResourceMap: []*hadoopyarn.NodeResourceMapProto{ { @@ -275,7 +268,7 @@ func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.Yarn }, } yarnClient, err := r.getYARNClient(yarnNode) - if err != nil { + if err != nil || yarnClient == nil { return err } if resp, err := yarnClient.UpdateNodeResource(request); err != nil { @@ -285,11 +278,13 @@ func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.Yarn return nil } -func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (*yarnclient.YarnClient, error) { - if yarnNode.ClusterID == "" && r.yarnClient != nil { +func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (yarnclient.YarnClient, error) { + if yarnNode == nil { + return nil, nil + } else if yarnNode.ClusterID == "" && r.yarnClient != nil { return r.yarnClient, nil } else if yarnNode.ClusterID == "" && r.yarnClient == nil { - yarnClient, err := yarnclient.CreateYarnClient() + yarnClient, err := yarnclient.DefaultYarnClientFactory.CreateDefaultYarnClient() if err != nil { return nil, err } @@ -302,15 +297,21 @@ func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (*y return clusterClient, nil } // create new client by cluster id - clusterClient, err := yarnclient.CreateYarnClientByClusterID(yarnNode.ClusterID) + clusterClient, err := yarnclient.DefaultYarnClientFactory.CreateYarnClientByClusterID(yarnNode.ClusterID) if err != nil { return nil, err } + if r.yarnClients == nil { + r.yarnClients = map[string]yarnclient.YarnClient{} + } r.yarnClients[yarnNode.ClusterID] = clusterClient return clusterClient, nil } func (r *YARNResourceSyncReconciler) getYARNNodeAllocatedResource(yarnNode *cache.YarnNode) (vcores int32, memoryMB int64, err error) { + if yarnNode == nil { + return 0, 0, nil + } nodeResource, exist := r.yarnNodeCache.GetNodeResource(yarnNode) if !exist { return 0, 0, nil diff --git a/pkg/controller/noderesource/resource_sync_controller_test.go b/pkg/controller/noderesource/resource_sync_controller_test.go index 228c687d..f0bf7c24 100644 --- a/pkg/controller/noderesource/resource_sync_controller_test.go +++ b/pkg/controller/noderesource/resource_sync_controller_test.go @@ -18,7 +18,8 @@ package noderesource import ( "context" - "encoding/json" + "fmt" + "github.com/golang/mock/gomock" "reflect" "testing" @@ -32,6 +33,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/koordinator-sh/goyarn/pkg/yarn/cache" + yarnclient "github.com/koordinator-sh/goyarn/pkg/yarn/client" + "github.com/koordinator-sh/goyarn/pkg/yarn/client/mockclient" ) func TestYARNResourceSyncReconciler_getYARNNode(t *testing.T) { @@ -60,8 +63,8 @@ func TestYARNResourceSyncReconciler_getYARNNode(t *testing.T) { YarnNMComponentLabel: YarnNMComponentValue, }, Annotations: map[string]string{ - YarnNodeIdAnnotation: "test-nm-id:8041", - YarnClusterIDAnnotation: "test-cluster-id", + YarnNodeIdAnnotation: "test-nm-id:8041", + PodYarnClusterIDAnnotationKey: "test-cluster-id", }, }, Spec: corev1.PodSpec{ @@ -109,7 +112,7 @@ func TestYARNResourceSyncReconciler_getYARNNode(t *testing.T) { func Test_getNodeBatchResource(t *testing.T) { type args struct { node *corev1.Node - originAllocatable *OriginAllocatable + originAllocatable corev1.ResourceList } tests := []struct { name string @@ -144,11 +147,9 @@ func Test_getNodeBatchResource(t *testing.T) { }, Status: corev1.NodeStatus{}, }, - originAllocatable: &OriginAllocatable{ - map[corev1.ResourceName]resource.Quantity{ - BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), - BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), - }, + originAllocatable: corev1.ResourceList{ + BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), }, }, wantBatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), @@ -169,11 +170,9 @@ func Test_getNodeBatchResource(t *testing.T) { }, }, }, - originAllocatable: &OriginAllocatable{ - map[corev1.ResourceName]resource.Quantity{ - BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), - BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), - }, + originAllocatable: corev1.ResourceList{ + BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + BatchMemory: *resource.NewQuantity(1024, resource.BinarySI), }, }, wantBatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), @@ -191,9 +190,8 @@ func Test_getNodeBatchResource(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.args.originAllocatable != nil && tt.args.node != nil && tt.args.node.Annotations != nil { - originAllocatableStr, err := json.Marshal(tt.args.originAllocatable) + err := SetOriginExtendedAllocatableRes(tt.args.node.Annotations, tt.args.originAllocatable) assert.NoError(t, err) - tt.args.node.Annotations[NodeOriginAllocatableAnnotationKey] = string(originAllocatableStr) } gotBatchCPU, gotBatchMemory, err := getNodeBatchResource(tt.args.node) assert.Equal(t, tt.wantErr, err != nil) @@ -212,7 +210,7 @@ func TestYARNResourceSyncReconciler_updateYARNAllocatedResource(t *testing.T) { tests := []struct { name string args args - wantAllocated *NodeAllocated + wantAllocated corev1.ResourceList wantErr bool }{ { @@ -233,11 +231,9 @@ func TestYARNResourceSyncReconciler_updateYARNAllocatedResource(t *testing.T) { vcores: 1, memoryMB: 1024, }, - wantAllocated: &NodeAllocated{ - YARNAllocated: map[corev1.ResourceName]resource.Quantity{ - BatchCPU: resource.MustParse("1"), - BatchMemory: resource.MustParse("1Gi"), - }, + wantAllocated: corev1.ResourceList{ + BatchCPU: resource.MustParse("1k"), + BatchMemory: resource.MustParse("1Gi"), }, wantErr: false, }, @@ -259,7 +255,7 @@ func TestYARNResourceSyncReconciler_updateYARNAllocatedResource(t *testing.T) { gotNode := &corev1.Node{} key := types.NamespacedName{Name: tt.args.node.Name} assert.NoError(t, r.Client.Get(context.TODO(), key, gotNode)) - nodeAllocated, gotErr := GetNodeAllocated(gotNode.Annotations) + nodeAllocated, gotErr := GetYARNAllocatedResource(gotNode.Annotations) assert.Equal(t, tt.wantAllocated, nodeAllocated) assert.NoError(t, gotErr) } @@ -404,8 +400,8 @@ func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { YarnNMComponentLabel: YarnNMComponentValue, }, Annotations: map[string]string{ - YarnNodeIdAnnotation: "test-yarn-node-id:8042", - YarnClusterIDAnnotation: "test-yarn-cluster-id", + YarnNodeIdAnnotation: "test-yarn-node-id:8042", + PodYarnClusterIDAnnotationKey: "test-yarn-cluster-id", }, }, Spec: corev1.PodSpec{ @@ -462,8 +458,8 @@ func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { YarnNMComponentLabel: YarnNMComponentValue, }, Annotations: map[string]string{ - YarnNodeIdAnnotation: "test-yarn-node-id-8042", - YarnClusterIDAnnotation: "test-yarn-cluster-id", + YarnNodeIdAnnotation: "test-yarn-node-id-8042", + PodYarnClusterIDAnnotationKey: "test-yarn-cluster-id", }, }, Spec: corev1.PodSpec{ @@ -491,8 +487,8 @@ func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { YarnNMComponentLabel: YarnNMComponentValue, }, Annotations: map[string]string{ - YarnNodeIdAnnotation: "test-yarn-node-id:bad-port", - YarnClusterIDAnnotation: "test-yarn-cluster-id", + YarnNodeIdAnnotation: "test-yarn-node-id:bad-port", + PodYarnClusterIDAnnotationKey: "test-yarn-cluster-id", }, }, Spec: corev1.PodSpec{ @@ -525,3 +521,312 @@ func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { }) } } + +func TestYARNResourceSyncReconciler_getYARNClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + yarnClient := mock_client.NewMockYarnClient(ctrl) + yarnCluster1Client := mock_client.NewMockYarnClient(ctrl) + yarnClients := map[string]yarnclient.YarnClient{ + "test-cluster1": yarnCluster1Client, + } + + type fields struct { + yarnClientOfReconciler yarnclient.YarnClient + yarnClientsOfReconciler map[string]yarnclient.YarnClient + yarnClientFromFactory yarnclient.YarnClient + yarnClientsFromFactory map[string]yarnclient.YarnClient + } + type args struct { + yarnNode *cache.YarnNode + factoryCreateDefaultError error + factoryCreateClusterError map[string]error + } + tests := []struct { + name string + fields fields + args args + want yarnclient.YarnClient + wantErr bool + }{ + { + name: "return nil with nil node", + fields: fields{}, + args: args{ + yarnNode: nil, + }, + want: nil, + wantErr: false, + }, + { + name: "get default client by empty cluster id", + fields: fields{ + yarnClientOfReconciler: yarnClient, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "", + }, + }, + want: yarnClient, + wantErr: false, + }, + { + name: "get default client by empty cluster id from new", + fields: fields{ + yarnClientFromFactory: yarnClient, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "", + }, + }, + want: yarnClient, + wantErr: false, + }, + { + name: "get default client by empty cluster id from new with error", + fields: fields{ + yarnClientFromFactory: yarnClient, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "", + }, + factoryCreateDefaultError: fmt.Errorf("create default error"), + }, + want: nil, + wantErr: true, + }, + { + name: "get cluster client from reconciler", + fields: fields{ + yarnClientsOfReconciler: yarnClients, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "test-cluster1", + }, + }, + want: yarnCluster1Client, + wantErr: false, + }, + { + name: "get cluster client from new", + fields: fields{ + yarnClientsFromFactory: yarnClients, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "test-cluster1", + }, + }, + want: yarnCluster1Client, + wantErr: false, + }, + { + name: "get cluster client from new with error", + fields: fields{ + yarnClientsFromFactory: yarnClients, + }, + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-node", + Port: 8042, + ClusterID: "test-cluster1", + }, + factoryCreateClusterError: map[string]error{ + "test-cluster1": fmt.Errorf("create default error"), + }, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockYarnClientFactory := mock_client.NewMockYarnClientFactory(ctrl) + yarnclient.DefaultYarnClientFactory = mockYarnClientFactory + + if tt.fields.yarnClientFromFactory != nil { + mockYarnClientFactory.EXPECT().CreateDefaultYarnClient().Return(tt.fields.yarnClientFromFactory, tt.args.factoryCreateDefaultError) + } + for clusterID, client := range tt.fields.yarnClientsFromFactory { + createError := tt.args.factoryCreateClusterError[clusterID] + mockYarnClientFactory.EXPECT().CreateYarnClientByClusterID(clusterID).Return(client, createError) + } + r := &YARNResourceSyncReconciler{ + yarnClient: tt.fields.yarnClientOfReconciler, + yarnClients: tt.fields.yarnClientsOfReconciler, + } + got, err := r.getYARNClient(tt.args.yarnNode) + if (err != nil) != tt.wantErr { + t.Errorf("getYARNClient() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getYARNClient() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestYARNResourceSyncReconciler_updateYARNNodeResource(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + type fields struct { + yarnClientErrorFromFactory error + doUpdate bool + updateNodeResourceError error + doReinit bool + reinitError error + } + type args struct { + yarnNode *cache.YarnNode + vcores int64 + memoryMB int64 + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "nil node return nil", + fields: fields{ + doUpdate: false, + }, + args: args{}, + wantErr: false, + }, + { + name: "get client failed", + fields: fields{ + yarnClientErrorFromFactory: fmt.Errorf("create client error"), + }, + args: args{ + yarnNode: &cache.YarnNode{}, + }, + wantErr: true, + }, + { + name: "update succ", + fields: fields{ + doUpdate: true, + }, + args: args{ + yarnNode: &cache.YarnNode{}, + }, + wantErr: false, + }, + { + name: "update failed", + fields: fields{ + doUpdate: true, + updateNodeResourceError: fmt.Errorf("update error"), + doReinit: true, + }, + args: args{ + yarnNode: &cache.YarnNode{}, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockYarnClientFactory := mock_client.NewMockYarnClientFactory(ctrl) + yarnclient.DefaultYarnClientFactory = mockYarnClientFactory + yarnClient := mock_client.NewMockYarnClient(ctrl) + if tt.args.yarnNode != nil { + mockYarnClientFactory.EXPECT().CreateDefaultYarnClient().Return(yarnClient, tt.fields.yarnClientErrorFromFactory) + } + if tt.fields.doUpdate { + yarnClient.EXPECT().UpdateNodeResource(gomock.Any()).Return(nil, tt.fields.updateNodeResourceError) + } + if tt.fields.doReinit { + yarnClient.EXPECT().Reinitialize().Return(tt.fields.reinitError) + } + + r := &YARNResourceSyncReconciler{} + if err := r.updateYARNNodeResource(tt.args.yarnNode, tt.args.vcores, tt.args.memoryMB); (err != nil) != tt.wantErr { + t.Errorf("updateYARNNodeResource() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestYARNResourceSyncReconciler_getYARNNodeAllocatedResource(t *testing.T) { + type args struct { + yarnNode *cache.YarnNode + } + tests := []struct { + name string + args args + wantVcores int32 + wantMemoryMB int64 + wantErr bool + }{ + { + name: "nil node return nothing", + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-yarn-node", + Port: 8041, + }, + }, + wantVcores: 0, + wantMemoryMB: 0, + wantErr: false, + }, + { + name: "get yarn node not exist", + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-yarn-node", + Port: 8041, + }, + }, + wantVcores: 0, + wantMemoryMB: 0, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockYarnClientFactory := mock_client.NewMockYarnClientFactory(ctrl) + yarnclient.DefaultYarnClientFactory = mockYarnClientFactory + yarnClient := mock_client.NewMockYarnClient(ctrl) + yarnNodeCache := cache.NewNodesSyncer(map[string]yarnclient.YarnClient{"default": yarnClient}) + + r := &YARNResourceSyncReconciler{ + yarnNodeCache: yarnNodeCache, + } + gotVcores, gotMemoryMB, err := r.getYARNNodeAllocatedResource(tt.args.yarnNode) + if (err != nil) != tt.wantErr { + t.Errorf("getYARNNodeAllocatedResource() error = %v, wantErr %v", err, tt.wantErr) + return + } + if gotVcores != tt.wantVcores { + t.Errorf("getYARNNodeAllocatedResource() gotVcores = %v, want %v", gotVcores, tt.wantVcores) + } + if gotMemoryMB != tt.wantMemoryMB { + t.Errorf("getYARNNodeAllocatedResource() gotMemoryMB = %v, want %v", gotMemoryMB, tt.wantMemoryMB) + } + }) + } +} diff --git a/pkg/controller/noderesource/yarn_resource.go b/pkg/controller/noderesource/yarn_resource.go index 53d444bb..c80cb462 100644 --- a/pkg/controller/noderesource/yarn_resource.go +++ b/pkg/controller/noderesource/yarn_resource.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + quotav1 "k8s.io/apiserver/pkg/quota/v1" "github.com/koordinator-sh/koordinator/apis/extension" ) @@ -31,11 +32,9 @@ const ( BatchCPU = extension.BatchCPU BatchMemory = extension.BatchMemory - YarnClusterIDAnnotation = "yarn.hadoop.apache.org/cluster-id" - - // TODO mv to koordinator/api - NodeOriginAllocatableAnnotationKey = "node.koordinator.sh/originAllocatable" - NodeAllocatedResourceAnnotationKey = "node.koordinator.sh/resourceAllocated" + PodYarnClusterIDAnnotationKey = "yarn.hadoop.apache.org/cluster-id" + YARNAllocationName = "hadoop-yarn" + YARNResourcePriority = extension.PriorityBatch ) func calculate(batchCPU resource.Quantity, batchMemory resource.Quantity) (int64, int64) { @@ -43,13 +42,53 @@ func calculate(batchCPU resource.Quantity, batchMemory resource.Quantity) (int64 return batchCPU.ScaledValue(resource.Kilo), batchMemory.ScaledValue(resource.Mega) } -// TODO mv to koordiantor api +func GetOriginExtendedAllocatableRes(annotations map[string]string) (corev1.ResourceList, error) { + originAllocatable, err := GetOriginExtendedAllocatable(annotations) + if originAllocatable == nil || err != nil { + return nil, err + } + return originAllocatable.Resources, nil +} + +func SetYARNAllocatedResource(annotations map[string]string, vcores int32, memoryMB int64) error { + resources := map[corev1.ResourceName]resource.Quantity{ + BatchCPU: *resource.NewQuantity(int64(vcores*1000), resource.DecimalSI), + BatchMemory: *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI), + } + return SetThirdPartyAllocation(annotations, YARNAllocationName, YARNResourcePriority, resources) +} + +func GetYARNAllocatedResource(annotations map[string]string) (corev1.ResourceList, error) { + thirdPartyAllocation, err := GetThirdPartyAllocations(annotations) + if thirdPartyAllocation == nil || err != nil { + return nil, err + } + for _, alloc := range thirdPartyAllocation.Allocations { + if alloc.Name == YARNAllocationName { + return alloc.Resources, nil + } + } + return nil, nil +} + +// TODO mv the followings to koordiantor api +const ( + // batch resource can be shared with other allocators such as Hadoop YARN + // record origin batch allocatable on node for calculating the batch allocatable of K8s and YARN, e.g. + // k8s_batch_allocatable = origin_batch_allocatable - yarn_batch_requested + // yarn_allocatable = origin_batch_allocatable - k8s_batch_requested + NodeOriginExtendedAllocatableAnnotationKey = "node.koordinator.sh/originExtendedAllocatable" + + // record (batch) allocations of other schedulers such as YARN, which should be excluded before updating node extended resource + NodeThirdPartyAllocationsAnnotationKey = "node.koordinator.sh/thirdPartyAllocations" +) + type OriginAllocatable struct { Resources corev1.ResourceList `json:"resources,omitempty"` } -func GetOriginExtendAllocatable(annotations map[string]string) (*OriginAllocatable, error) { - originAllocatableStr, exist := annotations[NodeOriginAllocatableAnnotationKey] +func GetOriginExtendedAllocatable(annotations map[string]string) (*OriginAllocatable, error) { + originAllocatableStr, exist := annotations[NodeOriginExtendedAllocatableAnnotationKey] if !exist { return nil, nil } @@ -60,18 +99,101 @@ func GetOriginExtendAllocatable(annotations map[string]string) (*OriginAllocatab return originAllocatable, nil } -type NodeAllocated struct { - YARNAllocated corev1.ResourceList `json:"yarnAllocated,omitempty"` +func SetOriginExtendedAllocatableRes(annotations map[string]string, extendedAllocatable corev1.ResourceList) error { + old, err := GetOriginExtendedAllocatable(annotations) + if old == nil || err != nil { + old = &OriginAllocatable{} + } + if old.Resources == nil { + old.Resources = map[corev1.ResourceName]resource.Quantity{} + } + for resourceName, value := range extendedAllocatable { + old.Resources[resourceName] = value + } + newStr, err := json.Marshal(old) + if err != nil { + return err + } + if annotations == nil { + annotations = map[string]string{} + } + annotations[NodeOriginExtendedAllocatableAnnotationKey] = string(newStr) + return nil +} + +type ThirdPartyAllocations struct { + Allocations []ThirdPartyAllocation `json:"allocations,omitempty"` +} + +type ThirdPartyAllocation struct { + Name string `json:"name"` + Priority extension.PriorityClass `json:"priority"` + Resources corev1.ResourceList `json:"resources,omitempty"` } -func GetNodeAllocated(annotations map[string]string) (*NodeAllocated, error) { - nodeAllocatedStr, exist := annotations[NodeAllocatedResourceAnnotationKey] +func GetThirdPartyAllocations(annotations map[string]string) (*ThirdPartyAllocations, error) { + valueStr, exist := annotations[NodeThirdPartyAllocationsAnnotationKey] if !exist { return nil, nil } - nodeAllocated := &NodeAllocated{} - if err := json.Unmarshal([]byte(nodeAllocatedStr), nodeAllocated); err != nil { + object := &ThirdPartyAllocations{} + if err := json.Unmarshal([]byte(valueStr), object); err != nil { return nil, err } - return nodeAllocated, nil + return object, nil +} + +func GetThirdPartyAllocatedResByPriority(annotations map[string]string, priority extension.PriorityClass) (corev1.ResourceList, error) { + allocations, err := GetThirdPartyAllocations(annotations) + if err != nil || allocations == nil { + return nil, err + } + result := corev1.ResourceList{} + for _, alloc := range allocations.Allocations { + if alloc.Priority == priority { + result = quotav1.Add(result, alloc.Resources) + } + } + return result, nil +} + +func SetThirdPartyAllocation(annotations map[string]string, name string, priority extension.PriorityClass, + resource corev1.ResourceList) error { + // parse or init old allocations + oldAllocations, err := GetThirdPartyAllocations(annotations) + if oldAllocations == nil || err != nil { + oldAllocations = &ThirdPartyAllocations{} + } + if oldAllocations.Allocations == nil { + oldAllocations.Allocations = make([]ThirdPartyAllocation, 0, 1) + } + + // create or update old alloc + newAlloc := ThirdPartyAllocation{ + Name: name, + Priority: priority, + Resources: resource, + } + exist := false + for i := range oldAllocations.Allocations { + if oldAllocations.Allocations[i].Name == name { + oldAllocations.Allocations[i] = newAlloc + exist = true + break + } + } + if !exist { + oldAllocations.Allocations = append(oldAllocations.Allocations, newAlloc) + } + + // update allocation string + newStr, err := json.Marshal(oldAllocations) + if err != nil { + return err + } + if annotations == nil { + annotations = map[string]string{} + } + annotations[NodeThirdPartyAllocationsAnnotationKey] = string(newStr) + return nil } diff --git a/pkg/controller/noderesource/yarn_resource_test.go b/pkg/controller/noderesource/yarn_resource_test.go new file mode 100644 index 00000000..805c09ab --- /dev/null +++ b/pkg/controller/noderesource/yarn_resource_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesource + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "reflect" + "testing" +) + +func TestGetOriginExtendAllocatable(t *testing.T) { + type args struct { + annotations map[string]string + } + tests := []struct { + name string + args args + want corev1.ResourceList + wantErr bool + }{ + { + name: "annotation not exist", + args: args{ + annotations: map[string]string{}, + }, + want: nil, + wantErr: false, + }, + { + name: "bad annotation format", + args: args{ + annotations: map[string]string{ + NodeOriginExtendedAllocatableAnnotationKey: "bad-format", + }, + }, + want: nil, + wantErr: true, + }, + { + name: "get from annotation succ", + args: args{ + annotations: map[string]string{ + NodeOriginExtendedAllocatableAnnotationKey: "{\"resources\": {\"kubernetes.io/batch-cpu\": 1000,\"kubernetes.io/batch-memory\": 1024}}", + }, + }, + want: map[corev1.ResourceName]resource.Quantity{ + BatchCPU: resource.MustParse("1000"), + BatchMemory: resource.MustParse("1024"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetOriginExtendedAllocatableRes(tt.args.annotations) + if (err != nil) != tt.wantErr { + t.Errorf("GetOriginExtendAllocatableRes() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetOriginExtendAllocatableRes() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetNodeAllocated(t *testing.T) { + type args struct { + annotations map[string]string + } + tests := []struct { + name string + args args + want corev1.ResourceList + wantErr bool + }{ + { + name: "annotation not exist", + args: args{ + annotations: map[string]string{}, + }, + want: nil, + wantErr: false, + }, + { + name: "bad annotation format", + args: args{ + annotations: map[string]string{ + NodeThirdPartyAllocationsAnnotationKey: "bad-format", + }, + }, + want: nil, + wantErr: true, + }, + { + name: "get from annotation succ", + args: args{ + annotations: map[string]string{ + NodeThirdPartyAllocationsAnnotationKey: "{\"allocations\":[{\"name\":\"hadoop-yarn\",\"priority\":\"koord-batch\",\"resources\":{\"kubernetes.io/batch-cpu\":\"1000\",\"kubernetes.io/batch-memory\":\"1024\"}}]}", + }, + }, + want: corev1.ResourceList{ + BatchCPU: resource.MustParse("1000"), + BatchMemory: resource.MustParse("1024"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := GetYARNAllocatedResource(tt.args.annotations) + if (err != nil) != tt.wantErr { + t.Errorf("GetNodeAllocated() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("GetYARNAllocatedResource() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/yarn/apis/service/applicationclient_service.go b/pkg/yarn/apis/service/applicationclient_service.go index b129015c..837f9cec 100644 --- a/pkg/yarn/apis/service/applicationclient_service.go +++ b/pkg/yarn/apis/service/applicationclient_service.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package service import ( diff --git a/pkg/yarn/apis/service/ha_service.go b/pkg/yarn/apis/service/ha_service.go index 3628d91b..93adf20d 100644 --- a/pkg/yarn/apis/service/ha_service.go +++ b/pkg/yarn/apis/service/ha_service.go @@ -1,3 +1,19 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package service import ( diff --git a/pkg/yarn/apis/service/resourcemanager_administration_service.go b/pkg/yarn/apis/service/resourcemanager_administration_service.go index 4e9fc651..546f2d88 100644 --- a/pkg/yarn/apis/service/resourcemanager_administration_service.go +++ b/pkg/yarn/apis/service/resourcemanager_administration_service.go @@ -1,17 +1,17 @@ /* - Copyright 2023 The Koordinator Authors. +Copyright 2022 The Koordinator Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package service diff --git a/pkg/yarn/cache/nodes_syncer.go b/pkg/yarn/cache/nodes_syncer.go index 3dec84bb..0546b5a5 100644 --- a/pkg/yarn/cache/nodes_syncer.go +++ b/pkg/yarn/cache/nodes_syncer.go @@ -31,14 +31,16 @@ const ( syncInterval = time.Second ) +// YARN RM only supports get all nodes from cluster, sync to cache for efficiency type NodesSyncer struct { - yarnClients map[string]*yarnclient.YarnClient + yarnClients map[string]yarnclient.YarnClient + // > cache map[string]map[string]*hadoopyarn.NodeReportProto mtx sync.RWMutex } -func NewNodesSyncer(yarnClients map[string]*yarnclient.YarnClient) *NodesSyncer { +func NewNodesSyncer(yarnClients map[string]yarnclient.YarnClient) *NodesSyncer { return &NodesSyncer{ yarnClients: yarnClients, cache: map[string]map[string]*hadoopyarn.NodeReportProto{}, @@ -72,7 +74,7 @@ func (r *NodesSyncer) Sync() { select { case <-t.C: if err := r.syncYARNNodeAllocatedResource(); err != nil { - klog.Error(err) + klog.Errorf("sync yarn node allocated resource failed, error: %v", err) } case <-debug.C: r.debug() @@ -113,12 +115,13 @@ func (r *NodesSyncer) syncYARNNodeAllocatedResource() error { for id, yarnClient := range r.yarnClients { nodes, err := yarnClient.GetClusterNodes(&req) if err != nil { - return err + initErr := yarnClient.Reinitialize() + return fmt.Errorf("GetClusterNodes error %v, reinitialize error %v", err, initErr) } clusterCache := map[string]*hadoopyarn.NodeReportProto{} for _, reportProto := range nodes.GetNodeReports() { if reportProto.NodeId.Host == nil || reportProto.NodeId.Port == nil { - klog.Warningf("nil node from rm") + klog.Warningf("got nil node from rm %v", id) continue } key := r.getKey(*reportProto.NodeId.Host, *reportProto.NodeId.Port) diff --git a/pkg/yarn/client/client.go b/pkg/yarn/client/client.go index 8de7f848..dd3d9745 100644 --- a/pkg/yarn/client/client.go +++ b/pkg/yarn/client/client.go @@ -18,8 +18,6 @@ package client import ( "fmt" - "os" - "k8s.io/klog/v2" "github.com/koordinator-sh/goyarn/pkg/yarn/apis/proto/hadoopcommon" @@ -28,7 +26,18 @@ import ( yarnconf "github.com/koordinator-sh/goyarn/pkg/yarn/config" ) -type YarnClient struct { +type YarnClient interface { + Initialize() error + Reinitialize() error + Close() + UpdateNodeResource(request *yarnserver.UpdateNodeResourceRequestProto) (*yarnserver.UpdateNodeResourceResponseProto, error) + GetClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) +} + +var _ YarnClient = &yarnClient{} + +type yarnClient struct { + confDir string conf yarnconf.YarnConfiguration haEnabled bool activeRMAdminAddress *string @@ -36,26 +45,12 @@ type YarnClient struct { clusterID string } -func CreateYarnClient() (*YarnClient, error) { - c := &YarnClient{} - if err := c.initialize(); err != nil { - return nil, err - } - return c, nil -} - -func CreateYarnClientByClusterID(clusterID string) (*YarnClient, error) { - c := &YarnClient{ - clusterID: clusterID, - } - if err := c.initialize(); err != nil { - return nil, err - } - return c, nil +func NewYarnClient(confDir string, clusterID string) YarnClient { + return &yarnClient{confDir: confDir, clusterID: clusterID} } -func (c *YarnClient) initialize() error { - if conf, err := yarnconf.NewYarnConfiguration(os.Getenv("HADOOP_CONF_DIR"), c.clusterID); err == nil { +func (c *yarnClient) Initialize() error { + if conf, err := yarnconf.NewYarnConfiguration(c.confDir, c.clusterID); err == nil { // TODO use flags for conf dir config c.conf = conf } else { @@ -103,19 +98,19 @@ func (c *YarnClient) initialize() error { return nil } -func (c *YarnClient) Close() { +func (c *yarnClient) Close() { c.activeRMAdminAddress = nil c.activeRMAddress = nil } -func (c *YarnClient) Reinitialize() error { +func (c *yarnClient) Reinitialize() error { c.Close() - return c.initialize() + return c.Initialize() } -func (c *YarnClient) UpdateNodeResource(request *yarnserver.UpdateNodeResourceRequestProto) (*yarnserver.UpdateNodeResourceResponseProto, error) { +func (c *yarnClient) UpdateNodeResource(request *yarnserver.UpdateNodeResourceRequestProto) (*yarnserver.UpdateNodeResourceResponseProto, error) { if c.activeRMAdminAddress == nil && c.haEnabled { - if err := c.initialize(); err != nil { + if err := c.Initialize(); err != nil { return nil, err } } @@ -123,9 +118,9 @@ func (c *YarnClient) UpdateNodeResource(request *yarnserver.UpdateNodeResourceRe return c.updateNodeResource(request) } -func (c *YarnClient) GetClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) { +func (c *yarnClient) GetClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) { if c.activeRMAdminAddress == nil && c.haEnabled { - if err := c.initialize(); err != nil { + if err := c.Initialize(); err != nil { return nil, err } } @@ -133,7 +128,7 @@ func (c *YarnClient) GetClusterNodes(request *hadoopyarn.GetClusterNodesRequestP return c.getClusterNodes(request) } -func (c *YarnClient) GetActiveRMID() (string, error) { +func (c *yarnClient) GetActiveRMID() (string, error) { rmIDs, err := c.conf.GetRMs() if err != nil { return "", err @@ -159,7 +154,7 @@ func (c *YarnClient) GetActiveRMID() (string, error) { return "", fmt.Errorf("active rm not found in %v", rmIDs) } -func (c *YarnClient) updateNodeResource(request *yarnserver.UpdateNodeResourceRequestProto) (*yarnserver.UpdateNodeResourceResponseProto, error) { +func (c *yarnClient) updateNodeResource(request *yarnserver.UpdateNodeResourceRequestProto) (*yarnserver.UpdateNodeResourceResponseProto, error) { // TODO keep client alive instead of create every time adminClient, err := CreateYarnAdminClient(c.conf, c.activeRMAdminAddress) if err != nil { @@ -168,7 +163,7 @@ func (c *YarnClient) updateNodeResource(request *yarnserver.UpdateNodeResourceRe return adminClient.UpdateNodeResource(request) } -func (c *YarnClient) getClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) { +func (c *yarnClient) getClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) { // TODO keep client alive instead of create every time applicationClient, err := CreateYarnApplicationClient(c.conf, c.activeRMAddress) if err != nil { diff --git a/pkg/yarn/client/examples/rm-get-cluster-nodes-with-ha/main.go b/pkg/yarn/client/examples/rm-get-cluster-nodes-with-ha/main.go index 2ac9091b..bd4c2fbc 100644 --- a/pkg/yarn/client/examples/rm-get-cluster-nodes-with-ha/main.go +++ b/pkg/yarn/client/examples/rm-get-cluster-nodes-with-ha/main.go @@ -24,8 +24,8 @@ import ( ) func main() { - // Create YarnClient - yarnClient, _ := yarnclient.CreateYarnClient() + // Create yarnClient + yarnClient, _ := yarnclient.DefaultYarnClientFactory.CreateDefaultYarnClient() request := &hadoopyarn.GetClusterNodesRequestProto{ NodeStates: []hadoopyarn.NodeStateProto{}, diff --git a/pkg/yarn/client/examples/rm-update-node-resource-with-ha/main.go b/pkg/yarn/client/examples/rm-update-node-resource-with-ha/main.go index f81255c2..394f172d 100644 --- a/pkg/yarn/client/examples/rm-update-node-resource-with-ha/main.go +++ b/pkg/yarn/client/examples/rm-update-node-resource-with-ha/main.go @@ -25,8 +25,8 @@ import ( ) func main() { - // Create YarnClient - yarnClient, _ := yarnclient.CreateYarnClient() + // Create yarnClient + yarnClient, _ := yarnclient.DefaultYarnClientFactory.CreateDefaultYarnClient() host := "0.0.0.0" port := int32(8041) diff --git a/pkg/yarn/client/factory.go b/pkg/yarn/client/factory.go new file mode 100644 index 00000000..01a1536e --- /dev/null +++ b/pkg/yarn/client/factory.go @@ -0,0 +1,93 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + + "k8s.io/klog/v2" +) + +const ( + envHadoopConfDir = "HADOOP_CONF_DIR" +) + +type YarnClientFactory interface { + CreateDefaultYarnClient() (YarnClient, error) + CreateYarnClientByClusterID(clusterID string) (YarnClient, error) + CreateAllYarnClients() (map[string]YarnClient, error) +} + +var DefaultYarnClientFactory YarnClientFactory = &yarnClientFactory{configDir: os.Getenv(envHadoopConfDir)} + +type yarnClientFactory struct { + configDir string +} + +func (f *yarnClientFactory) CreateDefaultYarnClient() (YarnClient, error) { + c := NewYarnClient(f.configDir, "") + if err := c.Initialize(); err != nil { + return nil, err + } + return c, nil +} + +func (f *yarnClientFactory) CreateYarnClientByClusterID(clusterID string) (YarnClient, error) { + c := NewYarnClient(f.configDir, clusterID) + if err := c.Initialize(); err != nil { + return nil, err + } + return c, nil +} + +func (f *yarnClientFactory) CreateAllYarnClients() (map[string]YarnClient, error) { + ids, err := f.getAllKnownClusterID() + if err != nil { + return nil, err + } + clients := map[string]YarnClient{} + for _, id := range ids { + yClient, err := f.CreateYarnClientByClusterID(id) + if err != nil { + klog.Error(err) + return nil, err + } + clients[id] = yClient + klog.V(3).Infof("init yarn client %s", id) + } + return clients, nil +} + +func (f *yarnClientFactory) getAllKnownClusterID() ([]string, error) { + res := []string{} + err := filepath.WalkDir(f.configDir, func(path string, d fs.DirEntry, err error) error { + if d.IsDir() { + return nil + } + if strings.HasSuffix(d.Name(), ".yarn-site.xml") { + res = append(res, strings.ReplaceAll(d.Name(), ".yarn-site.xml", "")) + } + return nil + }) + if err != nil { + return nil, err + } + return res, nil +} diff --git a/pkg/yarn/client/mockclient/mock_client.go b/pkg/yarn/client/mockclient/mock_client.go new file mode 100644 index 00000000..55031bb2 --- /dev/null +++ b/pkg/yarn/client/mockclient/mock_client.go @@ -0,0 +1,123 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/yarn/client/client.go + +// Package mock_client is a generated GoMock package. +package mock_client + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + hadoopyarn "github.com/koordinator-sh/goyarn/pkg/yarn/apis/proto/hadoopyarn" + server "github.com/koordinator-sh/goyarn/pkg/yarn/apis/proto/hadoopyarn/server" +) + +// MockYarnClient is a mock of YarnClient interface. +type MockYarnClient struct { + ctrl *gomock.Controller + recorder *MockYarnClientMockRecorder +} + +// MockYarnClientMockRecorder is the mock recorder for MockYarnClient. +type MockYarnClientMockRecorder struct { + mock *MockYarnClient +} + +// NewMockYarnClient creates a new mock instance. +func NewMockYarnClient(ctrl *gomock.Controller) *MockYarnClient { + mock := &MockYarnClient{ctrl: ctrl} + mock.recorder = &MockYarnClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockYarnClient) EXPECT() *MockYarnClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockYarnClient) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockYarnClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockYarnClient)(nil).Close)) +} + +// GetClusterNodes mocks base method. +func (m *MockYarnClient) GetClusterNodes(request *hadoopyarn.GetClusterNodesRequestProto) (*hadoopyarn.GetClusterNodesResponseProto, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetClusterNodes", request) + ret0, _ := ret[0].(*hadoopyarn.GetClusterNodesResponseProto) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetClusterNodes indicates an expected call of GetClusterNodes. +func (mr *MockYarnClientMockRecorder) GetClusterNodes(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterNodes", reflect.TypeOf((*MockYarnClient)(nil).GetClusterNodes), request) +} + +// Initialize mocks base method. +func (m *MockYarnClient) Initialize() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Initialize") + ret0, _ := ret[0].(error) + return ret0 +} + +// Initialize indicates an expected call of Initialize. +func (mr *MockYarnClientMockRecorder) Initialize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Initialize", reflect.TypeOf((*MockYarnClient)(nil).Initialize)) +} + +// Reinitialize mocks base method. +func (m *MockYarnClient) Reinitialize() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Reinitialize") + ret0, _ := ret[0].(error) + return ret0 +} + +// Reinitialize indicates an expected call of Reinitialize. +func (mr *MockYarnClientMockRecorder) Reinitialize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reinitialize", reflect.TypeOf((*MockYarnClient)(nil).Reinitialize)) +} + +// UpdateNodeResource mocks base method. +func (m *MockYarnClient) UpdateNodeResource(request *server.UpdateNodeResourceRequestProto) (*server.UpdateNodeResourceResponseProto, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateNodeResource", request) + ret0, _ := ret[0].(*server.UpdateNodeResourceResponseProto) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateNodeResource indicates an expected call of UpdateNodeResource. +func (mr *MockYarnClientMockRecorder) UpdateNodeResource(request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNodeResource", reflect.TypeOf((*MockYarnClient)(nil).UpdateNodeResource), request) +} diff --git a/pkg/yarn/client/mockclient/mock_factory.go b/pkg/yarn/client/mockclient/mock_factory.go new file mode 100644 index 00000000..d370c57e --- /dev/null +++ b/pkg/yarn/client/mockclient/mock_factory.go @@ -0,0 +1,97 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/yarn/client/factory.go + +// Package mock_client is a generated GoMock package. +package mock_client + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + client "github.com/koordinator-sh/goyarn/pkg/yarn/client" +) + +// MockYarnClientFactory is a mock of YarnClientFactory interface. +type MockYarnClientFactory struct { + ctrl *gomock.Controller + recorder *MockYarnClientFactoryMockRecorder +} + +// MockYarnClientFactoryMockRecorder is the mock recorder for MockYarnClientFactory. +type MockYarnClientFactoryMockRecorder struct { + mock *MockYarnClientFactory +} + +// NewMockYarnClientFactory creates a new mock instance. +func NewMockYarnClientFactory(ctrl *gomock.Controller) *MockYarnClientFactory { + mock := &MockYarnClientFactory{ctrl: ctrl} + mock.recorder = &MockYarnClientFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockYarnClientFactory) EXPECT() *MockYarnClientFactoryMockRecorder { + return m.recorder +} + +// CreateAllYarnClients mocks base method. +func (m *MockYarnClientFactory) CreateAllYarnClients() (map[string]client.YarnClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateAllYarnClients") + ret0, _ := ret[0].(map[string]client.YarnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateAllYarnClients indicates an expected call of CreateAllYarnClients. +func (mr *MockYarnClientFactoryMockRecorder) CreateAllYarnClients() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateAllYarnClients", reflect.TypeOf((*MockYarnClientFactory)(nil).CreateAllYarnClients)) +} + +// CreateDefaultYarnClient mocks base method. +func (m *MockYarnClientFactory) CreateDefaultYarnClient() (client.YarnClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDefaultYarnClient") + ret0, _ := ret[0].(client.YarnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDefaultYarnClient indicates an expected call of CreateDefaultYarnClient. +func (mr *MockYarnClientFactoryMockRecorder) CreateDefaultYarnClient() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDefaultYarnClient", reflect.TypeOf((*MockYarnClientFactory)(nil).CreateDefaultYarnClient)) +} + +// CreateYarnClientByClusterID mocks base method. +func (m *MockYarnClientFactory) CreateYarnClientByClusterID(clusterID string) (client.YarnClient, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateYarnClientByClusterID", clusterID) + ret0, _ := ret[0].(client.YarnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateYarnClientByClusterID indicates an expected call of CreateYarnClientByClusterID. +func (mr *MockYarnClientFactoryMockRecorder) CreateYarnClientByClusterID(clusterID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateYarnClientByClusterID", reflect.TypeOf((*MockYarnClientFactory)(nil).CreateYarnClientByClusterID), clusterID) +} diff --git a/pkg/yarn/client/utils.go b/pkg/yarn/client/utils.go deleted file mode 100644 index d78d6679..00000000 --- a/pkg/yarn/client/utils.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Copyright 2022 The Koordinator Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package client - -import ( - "io/fs" - "os" - "path/filepath" - "strings" - - "k8s.io/klog/v2" -) - -func GetAllKnownClusterID() ([]string, error) { - dir := os.Getenv("HADOOP_CONF_DIR") - res := []string{} - err := filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error { - if d.IsDir() { - return nil - } - if strings.HasSuffix(d.Name(), ".yarn-site.xml") { - res = append(res, strings.ReplaceAll(d.Name(), ".yarn-site.xml", "")) - } - return nil - }) - if err != nil { - return nil, err - } - return res, nil -} - -func GetAllKnownClients() (map[string]*YarnClient, error) { - ids, err := GetAllKnownClusterID() - if err != nil { - return nil, err - } - clients := map[string]*YarnClient{} - for _, id := range ids { - yClient, err := CreateYarnClientByClusterID(id) - if err != nil { - klog.Error(err) - return nil, err - } - clients[id] = yClient - klog.V(3).Infof("init yarn client %s", id) - } - return clients, nil -}