Skip to content

Commit

Permalink
yarn-operator: fix get cluster node failed when one of rm failed (#46)
Browse files Browse the repository at this point in the history
Signed-off-by: 佑祎 <[email protected]>
  • Loading branch information
zwzhang0107 authored Nov 14, 2023
1 parent d347d17 commit ff03eb4
Show file tree
Hide file tree
Showing 18 changed files with 1,064 additions and 185 deletions.
4 changes: 3 additions & 1 deletion .licenseignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
vendor
pkg/yarn/apis
pkg/yarn/apis/auth
pkg/yarn/apis/proto
pkg/yarn/apis/security
pkg/yarn/client/ipc
pkg/yarn/config
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.24.2 // indirect
k8s.io/apiserver v0.26.0 // indirect
k8s.io/apiserver v0.26.0
k8s.io/component-base v0.26.0 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
sigs.k8s.io/scheduler-plugins v0.24.15 // indirect
Expand All @@ -66,6 +66,7 @@ require (
require (
github.com/gin-gonic/gin v1.8.1
github.com/go-resty/resty/v2 v2.7.0
github.com/golang/mock v1.6.0
github.com/opencontainers/runc v1.1.6
github.com/stretchr/testify v1.8.2
)
Expand Down
31 changes: 31 additions & 0 deletions hack/mock-gen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash
#
# Copyright 2022-2023 The Koordinator Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -e

SHELL_FOLDER=$(cd "$(dirname "$0")";pwd)
LICENSE_HEADER_PATH="./hack/boilerplate/boilerplate.go.txt"

cd $GOPATH/src/github.com/koordinator-sh/goyarn

# generates gomock files
mockgen -source pkg/yarn/client/factory.go \
-destination pkg/yarn/client/mockclient/mock_factory.go \
-copyright_file ${LICENSE_HEADER_PATH}
mockgen -source pkg/yarn/client/client.go \
-destination pkg/yarn/client/mockclient/mock_client.go \
-copyright_file ${LICENSE_HEADER_PATH}
55 changes: 28 additions & 27 deletions pkg/controller/noderesource/resource_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ const (

type YARNResourceSyncReconciler struct {
client.Client
yarnClient *yarnclient.YarnClient
yarnClients map[string]*yarnclient.YarnClient
yarnClient yarnclient.YarnClient
yarnClients map[string]yarnclient.YarnClient
yarnNodeCache *cache.NodesSyncer
}

Expand Down Expand Up @@ -119,18 +119,16 @@ func getNodeBatchResource(node *corev1.Node) (batchCPU resource.Quantity, batchM
if !memExist {
batchMemory = *resource.NewQuantity(0, resource.BinarySI)
}
if node.Annotations == nil || len(node.Annotations[NodeOriginAllocatableAnnotationKey]) == 0 {
originAllocatableRes, err := GetOriginExtendedAllocatableRes(node.Annotations)
if err == nil && originAllocatableRes == nil {
// koordiantor <= 1.3.0, use node status as origin batch total
return
}

originAllocatable, err := GetOriginExtendAllocatable(node.Annotations)
if err != nil {
} else if err != nil {
klog.Warningf("get origin allocatable from node %v annotation failed, error: %v", node.Name, err)
return
}
batchCPU, cpuExist = originAllocatable.Resources[BatchCPU]
batchMemory, memExist = originAllocatable.Resources[BatchMemory]
batchCPU, cpuExist = originAllocatableRes[BatchCPU]
batchMemory, memExist = originAllocatableRes[BatchMemory]
if !cpuExist {
batchCPU = *resource.NewQuantity(0, resource.DecimalSI)
}
Expand All @@ -144,19 +142,11 @@ func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.No
if node == nil {
return nil
}
nodeAllocated := &NodeAllocated{
YARNAllocated: map[corev1.ResourceName]resource.Quantity{
BatchCPU: *resource.NewQuantity(int64(vcores), resource.DecimalSI),
BatchMemory: *resource.NewQuantity(memoryMB*1024*1024, resource.BinarySI),
},
}

nodeAllocatedStr, err := json.Marshal(nodeAllocated)
if err != nil {
newNode := node.DeepCopy()
if err := SetYARNAllocatedResource(newNode.Annotations, vcores, memoryMB); err != nil {
return err
}
newNode := node.DeepCopy()
newNode.Annotations[NodeAllocatedResourceAnnotationKey] = string(nodeAllocatedStr)

oldData, err := json.Marshal(node)
if err != nil {
return fmt.Errorf("failed to marshal the existing node %#v: %v", node, err)
Expand All @@ -175,7 +165,7 @@ func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.No
}

func Add(mgr ctrl.Manager) error {
clients, err := yarnclient.GetAllKnownClients()
clients, err := yarnclient.DefaultYarnClientFactory.CreateAllYarnClients()
if err != nil {
return err
}
Expand Down Expand Up @@ -251,13 +241,16 @@ func (r *YARNResourceSyncReconciler) getYARNNode(node *corev1.Node) (*cache.Yarn
Name: tokens[0],
Port: int32(port),
}
if clusterID, exist := nmPod.Annotations[YarnClusterIDAnnotation]; exist {
if clusterID, exist := nmPod.Annotations[PodYarnClusterIDAnnotationKey]; exist {
yarnNode.ClusterID = clusterID
}
return yarnNode, nil
}

func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.YarnNode, vcores, memoryMB int64) error {
if yarnNode == nil {
return nil
}
request := &yarnserver.UpdateNodeResourceRequestProto{
NodeResourceMap: []*hadoopyarn.NodeResourceMapProto{
{
Expand All @@ -275,7 +268,7 @@ func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.Yarn
},
}
yarnClient, err := r.getYARNClient(yarnNode)
if err != nil {
if err != nil || yarnClient == nil {
return err
}
if resp, err := yarnClient.UpdateNodeResource(request); err != nil {
Expand All @@ -285,11 +278,13 @@ func (r *YARNResourceSyncReconciler) updateYARNNodeResource(yarnNode *cache.Yarn
return nil
}

func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (*yarnclient.YarnClient, error) {
if yarnNode.ClusterID == "" && r.yarnClient != nil {
func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (yarnclient.YarnClient, error) {
if yarnNode == nil {
return nil, nil
} else if yarnNode.ClusterID == "" && r.yarnClient != nil {
return r.yarnClient, nil
} else if yarnNode.ClusterID == "" && r.yarnClient == nil {
yarnClient, err := yarnclient.CreateYarnClient()
yarnClient, err := yarnclient.DefaultYarnClientFactory.CreateDefaultYarnClient()
if err != nil {
return nil, err
}
Expand All @@ -302,15 +297,21 @@ func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (*y
return clusterClient, nil
}
// create new client by cluster id
clusterClient, err := yarnclient.CreateYarnClientByClusterID(yarnNode.ClusterID)
clusterClient, err := yarnclient.DefaultYarnClientFactory.CreateYarnClientByClusterID(yarnNode.ClusterID)
if err != nil {
return nil, err
}
if r.yarnClients == nil {
r.yarnClients = map[string]yarnclient.YarnClient{}
}
r.yarnClients[yarnNode.ClusterID] = clusterClient
return clusterClient, nil
}

func (r *YARNResourceSyncReconciler) getYARNNodeAllocatedResource(yarnNode *cache.YarnNode) (vcores int32, memoryMB int64, err error) {
if yarnNode == nil {
return 0, 0, nil
}
nodeResource, exist := r.yarnNodeCache.GetNodeResource(yarnNode)
if !exist {
return 0, 0, nil
Expand Down
Loading

0 comments on commit ff03eb4

Please sign in to comment.