diff --git a/go.mod b/go.mod index 82ef369d..70291599 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( k8s.io/api v0.26.0 k8s.io/apimachinery v0.26.0 k8s.io/client-go v0.26.0 - k8s.io/cri-api v0.25.3 + k8s.io/cri-api v0.25.3 // indirect k8s.io/klog/v2 v2.80.1 k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 sigs.k8s.io/controller-runtime v0.12.3 diff --git a/pkg/controller/noderesource/resource_sync_controller.go b/pkg/controller/noderesource/resource_sync_controller.go index 89a48d67..a1a53638 100644 --- a/pkg/controller/noderesource/resource_sync_controller.go +++ b/pkg/controller/noderesource/resource_sync_controller.go @@ -24,10 +24,10 @@ import ( "strings" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/cri-api/pkg/errors" "k8s.io/klog/v2" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" @@ -96,10 +96,7 @@ func (r *YARNResourceSyncReconciler) Reconcile(ctx context.Context, req reconcil klog.V(4).Infof("update batch resource to yarn node %+v finish, cpu-core %v, memory-mb %v, k8s node name: %s", yarnNode, vcores, memoryMB, node.Name) - core, mb, err := r.getYARNNodeAllocatedResource(yarnNode) - if err != nil { - return reconcile.Result{Requeue: true}, err - } + core, mb := r.getYARNNodeAllocatedResource(yarnNode) if err := r.updateYARNAllocatedResource(node, core, mb); err != nil { klog.Warningf("failed to update yarn allocated resource for node %v, error %v", node.Name, err) return reconcile.Result{Requeue: true}, err @@ -144,6 +141,9 @@ func (r *YARNResourceSyncReconciler) updateYARNAllocatedResource(node *corev1.No return nil } newNode := node.DeepCopy() + if newNode.Annotations == nil { + newNode.Annotations = map[string]string{} + } if err := SetYARNAllocatedResource(newNode.Annotations, vcores, memoryMB); err != nil { return err } @@ -316,13 +316,13 @@ func (r *YARNResourceSyncReconciler) getYARNClient(yarnNode *cache.YarnNode) (ya return clusterClient, nil } -func (r *YARNResourceSyncReconciler) getYARNNodeAllocatedResource(yarnNode *cache.YarnNode) (vcores int32, memoryMB int64, err error) { +func (r *YARNResourceSyncReconciler) getYARNNodeAllocatedResource(yarnNode *cache.YarnNode) (vcores int32, memoryMB int64) { if yarnNode == nil { - return 0, 0, nil + return 0, 0 } nodeResource, exist := r.yarnNodeCache.GetNodeResource(yarnNode) if !exist { - return 0, 0, nil + return 0, 0 } if nodeResource.Used.VirtualCores != nil { vcores = *nodeResource.Used.VirtualCores diff --git a/pkg/controller/noderesource/resource_sync_controller_test.go b/pkg/controller/noderesource/resource_sync_controller_test.go index 223477a2..049dbdfb 100644 --- a/pkg/controller/noderesource/resource_sync_controller_test.go +++ b/pkg/controller/noderesource/resource_sync_controller_test.go @@ -19,19 +19,24 @@ package noderesource import ( "context" "fmt" - "github.com/golang/mock/gomock" "reflect" "testing" + "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "github.com/koordinator-sh/yarn-copilot/pkg/yarn/apis/proto/hadoopyarn" "github.com/koordinator-sh/yarn-copilot/pkg/yarn/cache" yarnclient "github.com/koordinator-sh/yarn-copilot/pkg/yarn/client" "github.com/koordinator-sh/yarn-copilot/pkg/yarn/client/mockclient" @@ -186,6 +191,23 @@ func Test_getNodeBatchResource(t *testing.T) { }, wantErr: false, }, + { + name: "return zero with empty origin allocatable", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{}, + }, + }, + originAllocatable: corev1.ResourceList{}, + }, + wantBatchCPU: *resource.NewQuantity(0, resource.DecimalSI), + wantBatchMemory: *resource.NewQuantity(0, resource.BinarySI), + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -319,6 +341,53 @@ func TestYARNResourceSyncReconciler_getYARNNodeManagerPod(t *testing.T) { }, wantErr: false, }, + { + name: "get first node manager pod", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod2", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + }, + }, + want: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + ResourceVersion: "1", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + }, + wantErr: false, + }, { name: "node manager node found because of node label", args: args{ @@ -384,6 +453,19 @@ func TestYARNResourceSyncReconciler_getYARNNode1(t *testing.T) { want: nil, wantErr: false, }, + { + name: "get empty node list", + args: args{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + pods: []*corev1.Pod{}, + }, + want: nil, + wantErr: false, + }, { name: "get yarn node", args: args{ @@ -664,7 +746,7 @@ func TestYARNResourceSyncReconciler_getYARNClient(t *testing.T) { } for clusterID, client := range tt.fields.yarnClientsFromFactory { createError := tt.args.factoryCreateClusterError[clusterID] - mockYarnClientFactory.EXPECT().CreateYarnClientByClusterID(clusterID).Return(client, createError) + mockYarnClientFactory.EXPECT().CreateYarnClientByClusterID(clusterID).Return(client, createError).AnyTimes() } r := &YARNResourceSyncReconciler{ yarnClient: tt.fields.yarnClientOfReconciler, @@ -772,36 +854,75 @@ func TestYARNResourceSyncReconciler_getYARNNodeAllocatedResource(t *testing.T) { type args struct { yarnNode *cache.YarnNode } + type fields struct { + yarnNodesProto *hadoopyarn.GetClusterNodesResponseProto + } tests := []struct { name string args args + fields fields wantVcores int32 wantMemoryMB int64 - wantErr bool }{ { name: "nil node return nothing", + args: args{ + yarnNode: nil, + }, + wantVcores: 0, + wantMemoryMB: 0, + }, + { + name: "nil node resource return nothing", args: args{ yarnNode: &cache.YarnNode{ - Name: "test-yarn-node", - Port: 8041, + Name: "test-yarn-node", + Port: 8041, + ClusterID: yarnclient.DefaultClusterID, }, }, wantVcores: 0, wantMemoryMB: 0, - wantErr: false, }, { name: "get yarn node not exist", args: args{ yarnNode: &cache.YarnNode{ - Name: "test-yarn-node", - Port: 8041, + Name: "test-yarn-node", + Port: 8041, + ClusterID: yarnclient.DefaultClusterID, }, }, wantVcores: 0, wantMemoryMB: 0, - wantErr: false, + }, + { + name: "get yarn node resource", + args: args{ + yarnNode: &cache.YarnNode{ + Name: "test-yarn-node", + Port: 8041, + ClusterID: yarnclient.DefaultClusterID, + }, + }, + fields: fields{ + yarnNodesProto: &hadoopyarn.GetClusterNodesResponseProto{ + NodeReports: []*hadoopyarn.NodeReportProto{ + { + NodeId: &hadoopyarn.NodeIdProto{ + Host: pointer.String("test-yarn-node"), + Port: pointer.Int32(8041), + }, + Used: &hadoopyarn.ResourceProto{ + Memory: pointer.Int64(1024), + VirtualCores: pointer.Int32(10), + }, + }, + }, + }, + }, + wantVcores: 10, + wantMemoryMB: 1024, }, } for _, tt := range tests { @@ -811,16 +932,19 @@ func TestYARNResourceSyncReconciler_getYARNNodeAllocatedResource(t *testing.T) { mockYarnClientFactory := mock_client.NewMockYarnClientFactory(ctrl) yarnclient.DefaultYarnClientFactory = mockYarnClientFactory yarnClient := mock_client.NewMockYarnClient(ctrl) - yarnNodeCache := cache.NewNodesSyncer(map[string]yarnclient.YarnClient{"default": yarnClient}) + yarnClient.EXPECT().GetClusterNodes(gomock.Any()).Return(tt.fields.yarnNodesProto, nil).AnyTimes() + yarnNodeCache := cache.NewNodesSyncer(map[string]yarnclient.YarnClient{yarnclient.DefaultClusterID: yarnClient}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + assert.NoError(t, yarnNodeCache.Start(ctx)) + assert.NoError(t, wait.PollImmediateUntil(10*time.Millisecond, func() (bool, error) { + return yarnNodeCache.Started(), nil + }, ctx.Done())) r := &YARNResourceSyncReconciler{ yarnNodeCache: yarnNodeCache, } - gotVcores, gotMemoryMB, err := r.getYARNNodeAllocatedResource(tt.args.yarnNode) - if (err != nil) != tt.wantErr { - t.Errorf("getYARNNodeAllocatedResource() error = %v, wantErr %v", err, tt.wantErr) - return - } + gotVcores, gotMemoryMB := r.getYARNNodeAllocatedResource(tt.args.yarnNode) if gotVcores != tt.wantVcores { t.Errorf("getYARNNodeAllocatedResource() gotVcores = %v, want %v", gotVcores, tt.wantVcores) } @@ -830,3 +954,263 @@ func TestYARNResourceSyncReconciler_getYARNNodeAllocatedResource(t *testing.T) { }) } } + +func TestYARNResourceSyncReconciler_Reconcile(t *testing.T) { + type fields struct { + node *corev1.Node + pods []*corev1.Pod + yarnNodesProto *hadoopyarn.GetClusterNodesResponseProto + yarnUpdateErr error + } + type args struct { + nodeName string + } + tests := []struct { + name string + fields fields + args args + want reconcile.Result + wantErr bool + wantYARNAllocated corev1.ResourceList + }{ + { + name: "parse origin allocated failure", + fields: fields{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-name", + Annotations: map[string]string{ + NodeOriginExtendedAllocatableAnnotationKey: "bad-fmt", + }, + }, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node:8041", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node-name", + }, + }, + }, + yarnNodesProto: &hadoopyarn.GetClusterNodesResponseProto{ + NodeReports: []*hadoopyarn.NodeReportProto{ + { + NodeId: &hadoopyarn.NodeIdProto{ + Host: pointer.String("test-yarn-node"), + Port: pointer.Int32(8041), + }, + Used: &hadoopyarn.ResourceProto{ + Memory: pointer.Int64(1024), + VirtualCores: pointer.Int32(10), + }, + }, + }, + }, + }, + args: args{ + nodeName: "test-node-name", + }, + want: reconcile.Result{Requeue: true}, + wantErr: true, + wantYARNAllocated: nil, + }, + { + name: "update to yarn rm failure", + fields: fields{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "test-node-name"}, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node:8041", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node-name", + }, + }, + }, + yarnNodesProto: &hadoopyarn.GetClusterNodesResponseProto{ + NodeReports: []*hadoopyarn.NodeReportProto{ + { + NodeId: &hadoopyarn.NodeIdProto{ + Host: pointer.String("test-yarn-node"), + Port: pointer.Int32(8041), + }, + Used: &hadoopyarn.ResourceProto{ + Memory: pointer.Int64(1024), + VirtualCores: pointer.Int32(10), + }, + }, + }, + }, + yarnUpdateErr: fmt.Errorf("update yarn failed"), + }, + args: args{ + nodeName: "test-node-name", + }, + want: reconcile.Result{Requeue: true}, + wantErr: true, + wantYARNAllocated: nil, + }, + { + name: "update yarn allocated success", + fields: fields{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "test-node-name"}, + }, + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-nm-pod", + Labels: map[string]string{ + YarnNMComponentLabel: YarnNMComponentValue, + }, + Annotations: map[string]string{ + YarnNodeIdAnnotation: "test-yarn-node:8041", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node-name", + }, + }, + }, + yarnNodesProto: &hadoopyarn.GetClusterNodesResponseProto{ + NodeReports: []*hadoopyarn.NodeReportProto{ + { + NodeId: &hadoopyarn.NodeIdProto{ + Host: pointer.String("test-yarn-node"), + Port: pointer.Int32(8041), + }, + Used: &hadoopyarn.ResourceProto{ + Memory: pointer.Int64(1024), + VirtualCores: pointer.Int32(10), + }, + }, + }, + }, + }, + args: args{ + nodeName: "test-node-name", + }, + want: reconcile.Result{}, + wantErr: false, + wantYARNAllocated: corev1.ResourceList{ + BatchCPU: resource.MustParse("10k"), + BatchMemory: resource.MustParse("1Gi"), + }, + }, + { + name: "return nothing since node not found", + fields: fields{}, + args: args{ + nodeName: "test-node-name", + }, + want: reconcile.Result{}, + wantErr: false, + wantYARNAllocated: nil, + }, + { + name: "nm pod has removed from node", + fields: fields{ + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: "test-node-name"}, + }, + yarnNodesProto: &hadoopyarn.GetClusterNodesResponseProto{ + NodeReports: []*hadoopyarn.NodeReportProto{ + { + NodeId: &hadoopyarn.NodeIdProto{ + Host: pointer.String("test-yarn-node"), + Port: pointer.Int32(8041), + }, + Used: &hadoopyarn.ResourceProto{ + Memory: pointer.Int64(1024), + VirtualCores: pointer.Int32(10), + }, + }, + }, + }, + }, + args: args{ + nodeName: "test-node-name", + }, + want: reconcile.Result{}, + wantErr: false, + wantYARNAllocated: corev1.ResourceList{ + BatchCPU: resource.MustParse("0"), + BatchMemory: resource.MustParse("0"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = clientgoscheme.AddToScheme(scheme) + client := fake.NewClientBuilder().WithScheme(scheme).Build() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockYarnClientFactory := mock_client.NewMockYarnClientFactory(ctrl) + yarnclient.DefaultYarnClientFactory = mockYarnClientFactory + yarnClient := mock_client.NewMockYarnClient(ctrl) + mockYarnClientFactory.EXPECT().CreateYarnClientByClusterID(yarnclient.DefaultClusterID).Return(yarnClient, nil).AnyTimes() + yarnClient.EXPECT().GetClusterNodes(gomock.Any()).Return(tt.fields.yarnNodesProto, nil).AnyTimes() + yarnClient.EXPECT().Reinitialize().Return(nil).AnyTimes() + yarnClient.EXPECT().UpdateNodeResource(gomock.Any()).Return(nil, tt.fields.yarnUpdateErr).AnyTimes() + yarnNodeCache := cache.NewNodesSyncer(map[string]yarnclient.YarnClient{yarnclient.DefaultClusterID: yarnClient}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + assert.NoError(t, yarnNodeCache.Start(ctx)) + assert.NoError(t, wait.PollImmediateUntil(10*time.Millisecond, func() (bool, error) { + return yarnNodeCache.Started(), nil + }, ctx.Done())) + + r := &YARNResourceSyncReconciler{ + Client: client, + yarnNodeCache: yarnNodeCache, + } + if tt.fields.node != nil { + err := r.Client.Create(ctx, tt.fields.node) + assert.NoError(t, err) + } + for _, pod := range tt.fields.pods { + err := r.Client.Create(ctx, pod) + assert.NoError(t, err) + } + + got, err := r.Reconcile(ctx, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: tt.args.nodeName, + }, + }) + if (err != nil) != tt.wantErr { + t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.Equal(t, tt.wantErr, err != nil) + assert.Equal(t, tt.want, got) + if tt.fields.node != nil { + node := &corev1.Node{} + err = r.Client.Get(ctx, types.NamespacedName{Name: tt.args.nodeName}, node) + assert.NoError(t, err) + yarnAllocatedResList, err := GetYARNAllocatedResource(node.Annotations) + assert.Equal(t, tt.wantYARNAllocated, yarnAllocatedResList) + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/yarn/cache/nodes_syncer.go b/pkg/yarn/cache/nodes_syncer.go index cfba6fd1..8f0e7382 100644 --- a/pkg/yarn/cache/nodes_syncer.go +++ b/pkg/yarn/cache/nodes_syncer.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "k8s.io/klog/v2" @@ -35,6 +36,7 @@ const ( // YARN RM only supports get all nodes from cluster, sync to cache for efficiency type NodesSyncer struct { yarnClients map[string]yarnclient.YarnClient + started atomic.Bool // > cache map[string]map[string]*hadoopyarn.NodeReportProto @@ -77,6 +79,8 @@ func (r *NodesSyncer) Start(ctx context.Context) error { case <-t.C: if err := r.syncYARNNodeAllocatedResource(); err != nil { klog.Errorf("sync yarn node allocated resource failed, error: %v", err) + } else { + r.started.Store(true) } case <-debug.C: r.debug() @@ -89,6 +93,10 @@ func (r *NodesSyncer) Start(ctx context.Context) error { return nil } +func (r *NodesSyncer) Started() bool { + return r.started.Load() +} + func (r *NodesSyncer) debug() { r.mtx.RLock() defer r.mtx.RUnlock() @@ -125,6 +133,9 @@ func (r *NodesSyncer) syncYARNNodeAllocatedResource() error { initErr := yarnClient.Reinitialize() return fmt.Errorf("GetClusterNodes error %v, reinitialize error %v", err, initErr) } + if nodes == nil { + continue + } clusterCache := map[string]*hadoopyarn.NodeReportProto{} for _, reportProto := range nodes.GetNodeReports() { if reportProto.NodeId.Host == nil || reportProto.NodeId.Port == nil {