diff --git a/hack/generate-proto.sh b/hack/generate-proto.sh index 614ded3e8a..bf970ce318 100755 --- a/hack/generate-proto.sh +++ b/hack/generate-proto.sh @@ -75,6 +75,8 @@ gen-protoc(){ gen-protoc pkg/apis/proto/daemon/daemon.proto +gen-protoc pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto + gen-protoc pkg/apis/proto/isb/message.proto gen-protoc pkg/apis/proto/wmb/wmb.proto diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 7dd39fd2e8..f132575974 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -123,6 +123,14 @@ func (mv MonoVertex) GetDaemonDeploymentName() string { return fmt.Sprintf("%s-mv-daemon", mv.Name) } +func (mv MonoVertex) GetDaemonServiceURL() string { + return fmt.Sprintf("%s.%s.svc:%d", mv.GetDaemonServiceName(), mv.Namespace, MonoVertexDaemonServicePort) +} + +func (mv MonoVertex) Scalable() bool { + return !mv.Spec.Scale.Disabled +} + func (mv MonoVertex) GetDaemonServiceObj() *corev1.Service { labels := map[string]string{ KeyPartOf: Project, @@ -518,6 +526,18 @@ func (mvs *MonoVertexStatus) MarkPhaseRunning() { mvs.MarkPhase(MonoVertexPhaseRunning, "", "") } +// IsHealthy indicates whether the MonoVertex is in healthy status +func (mvs *MonoVertexStatus) IsHealthy() bool { + switch mvs.Phase { + case MonoVertexPhaseFailed: + return false + case MonoVertexPhaseRunning: + return mvs.IsReady() + default: + return false + } +} + // +kubebuilder:object:root=true // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type MonoVertexList struct { diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go new file mode 100644 index 0000000000..10d3e2350c --- /dev/null +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go @@ -0,0 +1,299 @@ +// +//Copyright 2022 The Numaproj Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v5.27.2 +// source: pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto + +package mvtxdaemon + +import ( + _ "google.golang.org/genproto/googleapis/api/annotations" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// MonoVertexMetrics is used to provide information about the mono vertex including processing rate. +type MonoVertexMetrics struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + MonoVertex string `protobuf:"bytes,1,opt,name=monoVertex,proto3" json:"monoVertex,omitempty"` + // Processing rate in the past period of time, 1m, 5m, 15m, default + ProcessingRates map[string]*wrapperspb.DoubleValue `protobuf:"bytes,2,rep,name=processingRates,proto3" json:"processingRates,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Pending in the past period of time, 1m, 5m, 15m, default + Pendings map[string]*wrapperspb.Int64Value `protobuf:"bytes,3,rep,name=pendings,proto3" json:"pendings,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *MonoVertexMetrics) Reset() { + *x = MonoVertexMetrics{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MonoVertexMetrics) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MonoVertexMetrics) ProtoMessage() {} + +func (x *MonoVertexMetrics) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MonoVertexMetrics.ProtoReflect.Descriptor instead. +func (*MonoVertexMetrics) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescGZIP(), []int{0} +} + +func (x *MonoVertexMetrics) GetMonoVertex() string { + if x != nil { + return x.MonoVertex + } + return "" +} + +func (x *MonoVertexMetrics) GetProcessingRates() map[string]*wrapperspb.DoubleValue { + if x != nil { + return x.ProcessingRates + } + return nil +} + +func (x *MonoVertexMetrics) GetPendings() map[string]*wrapperspb.Int64Value { + if x != nil { + return x.Pendings + } + return nil +} + +type GetMonoVertexMetricsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metrics *MonoVertexMetrics `protobuf:"bytes,1,opt,name=metrics,proto3" json:"metrics,omitempty"` +} + +func (x *GetMonoVertexMetricsResponse) Reset() { + *x = GetMonoVertexMetricsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetMonoVertexMetricsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMonoVertexMetricsResponse) ProtoMessage() {} + +func (x *GetMonoVertexMetricsResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMonoVertexMetricsResponse.ProtoReflect.Descriptor instead. +func (*GetMonoVertexMetricsResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescGZIP(), []int{1} +} + +func (x *GetMonoVertexMetricsResponse) GetMetrics() *MonoVertexMetrics { + if x != nil { + return x.Metrics + } + return nil +} + +var File_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto protoreflect.FileDescriptor + +var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc = []byte{ + 0x0a, 0x2a, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2f, 0x6d, 0x76, 0x74, 0x78, + 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x6d, 0x76, + 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x77, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x96, 0x03, 0x0a, 0x11, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, + 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x6d, 0x6f, 0x6e, + 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6d, + 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x12, 0x5c, 0x0a, 0x0f, 0x70, 0x72, 0x6f, + 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x61, 0x74, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, + 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, + 0x73, 0x2e, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x61, 0x74, 0x65, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, + 0x6e, 0x67, 0x52, 0x61, 0x74, 0x65, 0x73, 0x12, 0x47, 0x0a, 0x08, 0x70, 0x65, 0x6e, 0x64, 0x69, + 0x6e, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x76, 0x74, 0x78, + 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, + 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, + 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x70, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x73, + 0x1a, 0x60, 0x0a, 0x14, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x52, 0x61, + 0x74, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x32, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, + 0x6c, 0x65, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x1a, 0x58, 0x0a, 0x0d, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x73, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x31, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x49, 0x6e, 0x74, 0x36, 0x34, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x57, 0x0a, 0x1c, + 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x07, + 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, + 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, + 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x07, 0x6d, 0x65, + 0x74, 0x72, 0x69, 0x63, 0x73, 0x32, 0x8c, 0x01, 0x0a, 0x17, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, + 0x72, 0x74, 0x65, 0x78, 0x44, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x71, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, + 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x28, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x47, + 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, + 0x93, 0x02, 0x11, 0x12, 0x0f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, + 0x72, 0x69, 0x63, 0x73, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, + 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescOnce sync.Once + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescData = file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc +) + +func file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescGZIP() []byte { + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescOnce.Do(func() { + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescData) + }) + return file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescData +} + +var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_goTypes = []any{ + (*MonoVertexMetrics)(nil), // 0: mvtxdaemon.MonoVertexMetrics + (*GetMonoVertexMetricsResponse)(nil), // 1: mvtxdaemon.GetMonoVertexMetricsResponse + nil, // 2: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry + nil, // 3: mvtxdaemon.MonoVertexMetrics.PendingsEntry + (*wrapperspb.DoubleValue)(nil), // 4: google.protobuf.DoubleValue + (*wrapperspb.Int64Value)(nil), // 5: google.protobuf.Int64Value + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty +} +var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_depIdxs = []int32{ + 2, // 0: mvtxdaemon.MonoVertexMetrics.processingRates:type_name -> mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry + 3, // 1: mvtxdaemon.MonoVertexMetrics.pendings:type_name -> mvtxdaemon.MonoVertexMetrics.PendingsEntry + 0, // 2: mvtxdaemon.GetMonoVertexMetricsResponse.metrics:type_name -> mvtxdaemon.MonoVertexMetrics + 4, // 3: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry.value:type_name -> google.protobuf.DoubleValue + 5, // 4: mvtxdaemon.MonoVertexMetrics.PendingsEntry.value:type_name -> google.protobuf.Int64Value + 6, // 5: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:input_type -> google.protobuf.Empty + 1, // 6: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:output_type -> mvtxdaemon.GetMonoVertexMetricsResponse + 6, // [6:7] is the sub-list for method output_type + 5, // [5:6] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_init() } +func file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_init() { + if File_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*MonoVertexMetrics); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*GetMonoVertexMetricsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_goTypes, + DependencyIndexes: file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_depIdxs, + MessageInfos: file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes, + }.Build() + File_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto = out.File + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc = nil + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_goTypes = nil + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_depIdxs = nil +} diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go new file mode 100644 index 0000000000..97c8075676 --- /dev/null +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go @@ -0,0 +1,156 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto + +/* +Package mvtxdaemon is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package mvtxdaemon + +import ( + "context" + "io" + "net/http" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = metadata.Join + +func request_MonoVertexDaemonService_GetMonoVertexMetrics_0(ctx context.Context, marshaler runtime.Marshaler, client MonoVertexDaemonServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := client.GetMonoVertexMetrics(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_MonoVertexDaemonService_GetMonoVertexMetrics_0(ctx context.Context, marshaler runtime.Marshaler, server MonoVertexDaemonServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := server.GetMonoVertexMetrics(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterMonoVertexDaemonServiceHandlerServer registers the http handlers for service MonoVertexDaemonService to "mux". +// UnaryRPC :call MonoVertexDaemonServiceServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterMonoVertexDaemonServiceHandlerFromEndpoint instead. +func RegisterMonoVertexDaemonServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server MonoVertexDaemonServiceServer) error { + + mux.Handle("GET", pattern_MonoVertexDaemonService_GetMonoVertexMetrics_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics", runtime.WithHTTPPathPattern("/api/v1/metrics")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_MonoVertexDaemonService_GetMonoVertexMetrics_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_MonoVertexDaemonService_GetMonoVertexMetrics_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterMonoVertexDaemonServiceHandlerFromEndpoint is same as RegisterMonoVertexDaemonServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterMonoVertexDaemonServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.DialContext(ctx, endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterMonoVertexDaemonServiceHandler(ctx, mux, conn) +} + +// RegisterMonoVertexDaemonServiceHandler registers the http handlers for service MonoVertexDaemonService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterMonoVertexDaemonServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterMonoVertexDaemonServiceHandlerClient(ctx, mux, NewMonoVertexDaemonServiceClient(conn)) +} + +// RegisterMonoVertexDaemonServiceHandlerClient registers the http handlers for service MonoVertexDaemonService +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "MonoVertexDaemonServiceClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "MonoVertexDaemonServiceClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "MonoVertexDaemonServiceClient" to call the correct interceptors. +func RegisterMonoVertexDaemonServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client MonoVertexDaemonServiceClient) error { + + mux.Handle("GET", pattern_MonoVertexDaemonService_GetMonoVertexMetrics_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics", runtime.WithHTTPPathPattern("/api/v1/metrics")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_MonoVertexDaemonService_GetMonoVertexMetrics_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_MonoVertexDaemonService_GetMonoVertexMetrics_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_MonoVertexDaemonService_GetMonoVertexMetrics_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "v1", "metrics"}, "")) +) + +var ( + forward_MonoVertexDaemonService_GetMonoVertexMetrics_0 = runtime.ForwardResponseMessage +) diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto new file mode 100644 index 0000000000..512b5bf515 --- /dev/null +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +syntax = "proto3"; + +option go_package = "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon"; + +import "google/api/annotations.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/wrappers.proto"; + +package mvtxdaemon; + +// MonoVertexMetrics is used to provide information about the mono vertex including processing rate. +message MonoVertexMetrics { + string monoVertex = 1; + // Processing rate in the past period of time, 1m, 5m, 15m, default + map processingRates = 2; + // Pending in the past period of time, 1m, 5m, 15m, default + map pendings = 3; +} + +message GetMonoVertexMetricsResponse { + MonoVertexMetrics metrics = 1; +} + +// MonoVertexDaemonService is a grpc service that is used to provide APIs for giving any MonoVertex information. +service MonoVertexDaemonService { + + rpc GetMonoVertexMetrics (google.protobuf.Empty) returns (GetMonoVertexMetricsResponse) { + option (google.api.http).get = "/api/v1/metrics"; + }; + +} \ No newline at end of file diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go new file mode 100644 index 0000000000..1dd50188d7 --- /dev/null +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go @@ -0,0 +1,126 @@ +// +//Copyright 2022 The Numaproj Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.27.2 +// source: pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto + +package mvtxdaemon + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + MonoVertexDaemonService_GetMonoVertexMetrics_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics" +) + +// MonoVertexDaemonServiceClient is the client API for MonoVertexDaemonService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MonoVertexDaemonServiceClient interface { + GetMonoVertexMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexMetricsResponse, error) +} + +type monoVertexDaemonServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewMonoVertexDaemonServiceClient(cc grpc.ClientConnInterface) MonoVertexDaemonServiceClient { + return &monoVertexDaemonServiceClient{cc} +} + +func (c *monoVertexDaemonServiceClient) GetMonoVertexMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexMetricsResponse, error) { + out := new(GetMonoVertexMetricsResponse) + err := c.cc.Invoke(ctx, MonoVertexDaemonService_GetMonoVertexMetrics_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// MonoVertexDaemonServiceServer is the server API for MonoVertexDaemonService service. +// All implementations must embed UnimplementedMonoVertexDaemonServiceServer +// for forward compatibility +type MonoVertexDaemonServiceServer interface { + GetMonoVertexMetrics(context.Context, *emptypb.Empty) (*GetMonoVertexMetricsResponse, error) + mustEmbedUnimplementedMonoVertexDaemonServiceServer() +} + +// UnimplementedMonoVertexDaemonServiceServer must be embedded to have forward compatible implementations. +type UnimplementedMonoVertexDaemonServiceServer struct { +} + +func (UnimplementedMonoVertexDaemonServiceServer) GetMonoVertexMetrics(context.Context, *emptypb.Empty) (*GetMonoVertexMetricsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMonoVertexMetrics not implemented") +} +func (UnimplementedMonoVertexDaemonServiceServer) mustEmbedUnimplementedMonoVertexDaemonServiceServer() { +} + +// UnsafeMonoVertexDaemonServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MonoVertexDaemonServiceServer will +// result in compilation errors. +type UnsafeMonoVertexDaemonServiceServer interface { + mustEmbedUnimplementedMonoVertexDaemonServiceServer() +} + +func RegisterMonoVertexDaemonServiceServer(s grpc.ServiceRegistrar, srv MonoVertexDaemonServiceServer) { + s.RegisterService(&MonoVertexDaemonService_ServiceDesc, srv) +} + +func _MonoVertexDaemonService_GetMonoVertexMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MonoVertexDaemonServiceServer).GetMonoVertexMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: MonoVertexDaemonService_GetMonoVertexMetrics_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MonoVertexDaemonServiceServer).GetMonoVertexMetrics(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +// MonoVertexDaemonService_ServiceDesc is the grpc.ServiceDesc for MonoVertexDaemonService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var MonoVertexDaemonService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "mvtxdaemon.MonoVertexDaemonService", + HandlerType: (*MonoVertexDaemonServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetMonoVertexMetrics", + Handler: _MonoVertexDaemonService_GetMonoVertexMetrics_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto", +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index be3b34aff7..2b7b68ba50 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -28,7 +28,7 @@ const ( LabelISBService = "isbsvc" LabelPipeline = "pipeline" LabelVertex = "vertex" - LabelMonoVertex = "mono_vertex" + LabelMonoVertex = "mvtx" LabelVertexReplicaIndex = "replica" LabelVertexType = "vertex_type" LabelPartitionName = "partition_name" diff --git a/pkg/mvtxdaemon/client/doc.go b/pkg/mvtxdaemon/client/doc.go new file mode 100644 index 0000000000..8e7e39073b --- /dev/null +++ b/pkg/mvtxdaemon/client/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package client is used to create the MonoVertex daemon service client. +// +// There are 2 clients available. +// +// 1. gRPC client +// func NewGRPCClient(address string) (MonoVertexDaemonClient, error) +// +// 2. RESTful client +// func NewRESTfulClient(address string) (MonoVertexDaemonClient, error) +package client diff --git a/pkg/mvtxdaemon/client/grpc_client.go b/pkg/mvtxdaemon/client/grpc_client.go new file mode 100644 index 0000000000..c7fb80d6fc --- /dev/null +++ b/pkg/mvtxdaemon/client/grpc_client.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "crypto/tls" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon" +) + +type grpcClient struct { + client mvtxdaemon.MonoVertexDaemonServiceClient + conn *grpc.ClientConn +} + +var _ MonoVertexDaemonClient = (*grpcClient)(nil) + +func NewGRPCClient(address string) (MonoVertexDaemonClient, error) { + config := &tls.Config{ + InsecureSkipVerify: true, + } + conn, err := grpc.Dial(address, grpc.WithTransportCredentials(credentials.NewTLS(config))) + if err != nil { + return nil, err + } + daemonClient := mvtxdaemon.NewMonoVertexDaemonServiceClient(conn) + return &grpcClient{conn: conn, client: daemonClient}, nil +} + +func (dc *grpcClient) GetMonoVertexMetrics(ctx context.Context) (*mvtxdaemon.MonoVertexMetrics, error) { + if rspn, err := dc.client.GetMonoVertexMetrics(ctx, &emptypb.Empty{}); err != nil { + return nil, err + } else { + return rspn.Metrics, nil + } +} + +// Close function closes the gRPC connection, it has to be called after a daemon client has finished all its jobs. +func (dc *grpcClient) Close() error { + if dc.conn != nil { + return dc.conn.Close() + } + return nil +} diff --git a/pkg/mvtxdaemon/client/grpc_client_test.go b/pkg/mvtxdaemon/client/grpc_client_test.go new file mode 100644 index 0000000000..b11582d4fd --- /dev/null +++ b/pkg/mvtxdaemon/client/grpc_client_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client diff --git a/pkg/mvtxdaemon/client/interface.go b/pkg/mvtxdaemon/client/interface.go new file mode 100644 index 0000000000..71a1a4aeaf --- /dev/null +++ b/pkg/mvtxdaemon/client/interface.go @@ -0,0 +1,29 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "io" + + "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon" +) + +type MonoVertexDaemonClient interface { + io.Closer + GetMonoVertexMetrics(ctx context.Context) (*mvtxdaemon.MonoVertexMetrics, error) +} diff --git a/pkg/mvtxdaemon/client/restful_client.go b/pkg/mvtxdaemon/client/restful_client.go new file mode 100644 index 0000000000..7409e19734 --- /dev/null +++ b/pkg/mvtxdaemon/client/restful_client.go @@ -0,0 +1,91 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + + "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon" +) + +var ( + // Use JSONPb to unmarshal the response, it is needed to unmarshal the response with google.protobuf.* data types. + jsonMarshaller = new(runtime.JSONPb) +) + +type restfulClient struct { + hostURL string + httpClient *http.Client +} + +var _ MonoVertexDaemonClient = (*restfulClient)(nil) + +func NewRESTfulClient(address string) (MonoVertexDaemonClient, error) { + if !strings.HasPrefix(address, "https://") { + address = "https://" + address + } + client := &restfulClient{ + hostURL: address, + httpClient: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: time.Second * 1, + }, + } + return client, nil +} + +func (rc *restfulClient) Close() error { + return nil +} + +func unmarshalResponse[T any](r *http.Response) (*T, error) { + if r.StatusCode >= 300 { + return nil, fmt.Errorf("unexpected response %v: %s", r.StatusCode, r.Status) + } + data, err := io.ReadAll(r.Body) + if err != nil { + return nil, fmt.Errorf("failed to read data from response body, %w", err) + } + var t T + if err := jsonMarshaller.Unmarshal(data, &t); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body to %T, %w", t, err) + } + return &t, nil +} + +func (rc *restfulClient) GetMonoVertexMetrics(ctx context.Context) (*mvtxdaemon.MonoVertexMetrics, error) { + resp, err := rc.httpClient.Get(fmt.Sprintf("%s/api/v1/metrics", rc.hostURL)) + if err != nil { + return nil, fmt.Errorf("failed to call get mono vertex metrics RESTful API, %w", err) + } + defer func() { _ = resp.Body.Close() }() + if res, err := unmarshalResponse[mvtxdaemon.GetMonoVertexMetricsResponse](resp); err != nil { + return nil, err + } else { + return res.Metrics, nil + } +} diff --git a/pkg/mvtxdaemon/client/restful_client_test.go b/pkg/mvtxdaemon/client/restful_client_test.go new file mode 100644 index 0000000000..b11582d4fd --- /dev/null +++ b/pkg/mvtxdaemon/client/restful_client_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client diff --git a/pkg/mvtxdaemon/server/daemon_server.go b/pkg/mvtxdaemon/server/daemon_server.go index 29553493e6..87931c2e67 100644 --- a/pkg/mvtxdaemon/server/daemon_server.go +++ b/pkg/mvtxdaemon/server/daemon_server.go @@ -38,6 +38,8 @@ import ( "github.com/numaproj/numaflow" "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/apis/proto/daemon" + "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon" + "github.com/numaproj/numaflow/pkg/mvtxdaemon/server/service" "github.com/numaproj/numaflow/pkg/shared/logging" sharedtls "github.com/numaproj/numaflow/pkg/shared/tls" ) @@ -90,7 +92,7 @@ func (ds *daemonServer) Run(ctx context.Context) error { go func() { _ = tcpm.Serve() }() version := numaflow.GetVersion() - mono_vertex_info.WithLabelValues(version.Version, version.Platform, ds.monoVtx.Name).Set(1) + monoVertexInfo.WithLabelValues(version.Version, version.Platform, ds.monoVtx.Name).Set(1) log.Infof("MonoVertex daemon server started successfully on %s", address) <-ctx.Done() @@ -111,6 +113,11 @@ func (ds *daemonServer) newGRPCServer() (*grpc.Server, error) { } grpcServer := grpc.NewServer(sOpts...) grpc_prometheus.Register(grpcServer) + mvtxService, err := service.NewMoveVertexService(ds.monoVtx) + if err != nil { + return nil, err + } + mvtxdaemon.RegisterMonoVertexDaemonServiceServer(grpcServer, mvtxService) return grpcServer, nil } diff --git a/pkg/mvtxdaemon/server/metrics.go b/pkg/mvtxdaemon/server/metrics.go index f0aa155c31..01f3226910 100644 --- a/pkg/mvtxdaemon/server/metrics.go +++ b/pkg/mvtxdaemon/server/metrics.go @@ -24,8 +24,8 @@ import ( ) var ( - mono_vertex_info = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Subsystem: "monovtx", + monoVertexInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "mvtx", Name: "build_info", Help: "A metric with a constant value '1', labeled by Numaflow binary version and platform, as well as the mono vertex name", }, []string{metrics.LabelVersion, metrics.LabelPlatform, metrics.LabelMonoVertex}) diff --git a/pkg/mvtxdaemon/server/service/mvtx_service.go b/pkg/mvtxdaemon/server/service/mvtx_service.go new file mode 100644 index 0000000000..8bc3e1b3ed --- /dev/null +++ b/pkg/mvtxdaemon/server/service/mvtx_service.go @@ -0,0 +1,112 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package service + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "time" + + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/apis/proto/mvtxdaemon" + "github.com/numaproj/numaflow/pkg/metrics" + "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/prometheus/common/expfmt" +) + +type MoveVertexService struct { + mvtxdaemon.UnimplementedMonoVertexDaemonServiceServer + monoVtx *v1alpha1.MonoVertex + httpClient *http.Client + // TODO: add rater + // rater rater.Ratable +} + +var _ mvtxdaemon.MonoVertexDaemonServiceServer = (*MoveVertexService)(nil) + +// NewMoveVertexService returns a new instance of MoveVertexService +func NewMoveVertexService( + monoVtx *v1alpha1.MonoVertex, +) (*MoveVertexService, error) { + mv := MoveVertexService{ + monoVtx: monoVtx, + httpClient: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: time.Second * 3, + }, + } + return &mv, nil +} + +func (mvs *MoveVertexService) GetMonoVertexMetrics(ctx context.Context, empty *emptypb.Empty) (*mvtxdaemon.GetMonoVertexMetricsResponse, error) { + resp := new(mvtxdaemon.GetMonoVertexMetricsResponse) + metrics := new(mvtxdaemon.MonoVertexMetrics) + metrics.MonoVertex = mvs.monoVtx.Name + metrics.Pendings = mvs.getPending(ctx) + // TODO: add processing rate + resp.Metrics = metrics + return resp, nil +} + +// getPending returns the pending count for the mono vertex +func (mvs *MoveVertexService) getPending(ctx context.Context) map[string]*wrapperspb.Int64Value { + log := logging.FromContext(ctx) + headlessServiceName := mvs.monoVtx.GetHeadlessServiceName() + pendingMap := make(map[string]*wrapperspb.Int64Value) + + // Get the headless service name + // We can query the metrics endpoint of the (i)th pod to obtain this value. + // example for 0th pod : https://simple-mono-vertex-mv-0.simple-mono-vertex-mv-headless:2469/metrics + url := fmt.Sprintf("https://%s-mv-1.%s.%s.svc:%v/metrics", mvs.monoVtx.Name, headlessServiceName, mvs.monoVtx.Namespace, v1alpha1.MonoVertexMetricsPort) + if res, err := mvs.httpClient.Get(url); err != nil { + log.Debugf("Error reading the metrics endpoint, it might be because of mono vertex scaling down to 0: %f", err.Error()) + return nil + } else { + // expfmt Parser from prometheus to parse the metrics + textParser := expfmt.TextParser{} + result, err := textParser.TextToMetricFamilies(res.Body) + if err != nil { + log.Errorw("Error in parsing to prometheus metric families", zap.Error(err)) + return nil + } + + // Get the pending messages + if value, ok := result["TODO:metric-name"]; ok { // TODO: metric name + metricsList := value.GetMetric() + for _, metric := range metricsList { + labels := metric.GetLabel() + lookback := "" + for _, label := range labels { + if label.GetName() == metrics.LabelPeriod { + lookback = label.GetValue() + break + } + } + pendingMap[lookback] = wrapperspb.Int64(int64(metric.Gauge.GetValue())) + } + } + } + return pendingMap +} diff --git a/pkg/reconciler/cmd/start.go b/pkg/reconciler/cmd/start.go index f7fa5f295f..0565aefad1 100644 --- a/pkg/reconciler/cmd/start.go +++ b/pkg/reconciler/cmd/start.go @@ -40,6 +40,7 @@ import ( "github.com/numaproj/numaflow/pkg/reconciler" isbsvcctrl "github.com/numaproj/numaflow/pkg/reconciler/isbsvc" monovtxctrl "github.com/numaproj/numaflow/pkg/reconciler/monovertex" + mvtxscaling "github.com/numaproj/numaflow/pkg/reconciler/monovertex/scaling" plctrl "github.com/numaproj/numaflow/pkg/reconciler/pipeline" vertexctrl "github.com/numaproj/numaflow/pkg/reconciler/vertex" "github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling" @@ -237,8 +238,9 @@ func Start(namespaced bool, managedNamespace string) { } // MonoVertex controller + mvtxAutoscaler := mvtxscaling.NewScaler(mgr.GetClient(), mvtxscaling.WithWorkers(20)) monoVertexController, err := controller.New(dfv1.ControllerMonoVertex, mgr, controller.Options{ - Reconciler: monovtxctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, logger, mgr.GetEventRecorderFor(dfv1.ControllerMonoVertex)), + Reconciler: monovtxctrl.NewReconciler(mgr.GetClient(), mgr.GetScheme(), config, image, mvtxAutoscaler, logger, mgr.GetEventRecorderFor(dfv1.ControllerMonoVertex)), }) if err != nil { logger.Fatalw("Unable to set up MonoVertex controller", zap.Error(err)) @@ -276,9 +278,14 @@ func Start(namespaced bool, managedNamespace string) { logger.Fatalw("Unable to watch Deployments", zap.Error(err)) } - // Add autoscaling runner + // Add Vertex autoscaling runner if err := mgr.Add(LeaderElectionRunner(autoscaler.Start)); err != nil { - logger.Fatalw("Unable to add autoscaling runner", zap.Error(err)) + logger.Fatalw("Unable to add Vertex autoscaling runner", zap.Error(err)) + } + + // Add MonoVertex autoscaling runner + if err := mgr.Add(LeaderElectionRunner(mvtxAutoscaler.Start)); err != nil { + logger.Fatalw("Unable to add MonoVertex autoscaling runner", zap.Error(err)) } version := numaflow.GetVersion() diff --git a/pkg/reconciler/metrics.go b/pkg/reconciler/metrics.go index 37d4cc5148..7084108281 100644 --- a/pkg/reconciler/metrics.go +++ b/pkg/reconciler/metrics.go @@ -45,6 +45,12 @@ var ( Help: "A metric to indicate whether the Pipeline is healthy. '1' means healthy, '0' means unhealthy", }, []string{metrics.LabelNamespace, metrics.LabelISBService}) + MonoVertexHealth = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "controller", + Name: "mvtx_health", + Help: "A metric to indicate whether the MonoVertex is healthy. '1' means healthy, '0' means unhealthy", + }, []string{metrics.LabelNamespace, metrics.LabelMonoVertex}) + // JetStreamISBSvcReplicas indicates the replicas of a JetStream ISB Service. JetStreamISBSvcReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "controller", @@ -72,8 +78,25 @@ var ( Name: "vertex_current_replicas", Help: "A metric indicates the current replicas of a Vertex", }, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex}) + + // MonoVertexDisiredReplicas indicates the desired replicas of a MonoVertex. + MonoVertexDisiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "controller", + Name: "mvtx_desired_replicas", + Help: "A metric indicates the desired replicas of a MonoVertex", + }, []string{metrics.LabelNamespace, metrics.LabelMonoVertex}) + + // MonoVertexCurrentReplicas indicates the current replicas of a MonoVertex. + MonoVertexCurrentReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "controller", + Name: "mvtx_current_replicas", + Help: "A metric indicates the current replicas of a MonoVertex", + }, []string{metrics.LabelNamespace, metrics.LabelMonoVertex}) ) func init() { - ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, VertexDisiredReplicas, VertexCurrentReplicas) + ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth, + MonoVertexHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas, + VertexDisiredReplicas, VertexCurrentReplicas, MonoVertexDisiredReplicas, + MonoVertexCurrentReplicas) } diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index 3688d87b60..28122b485c 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -39,6 +39,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" + mvtxscaling "github.com/numaproj/numaflow/pkg/reconciler/monovertex/scaling" "github.com/numaproj/numaflow/pkg/shared/logging" sharedutil "github.com/numaproj/numaflow/pkg/shared/util" ) @@ -52,11 +53,12 @@ type monoVertexReconciler struct { image string logger *zap.SugaredLogger + scaler *mvtxscaling.Scaler recorder record.EventRecorder } -func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, logger *zap.SugaredLogger, recorder record.EventRecorder) reconcile.Reconciler { - return &monoVertexReconciler{client: client, scheme: scheme, config: config, image: image, logger: logger, recorder: recorder} +func NewReconciler(client client.Client, scheme *runtime.Scheme, config *reconciler.GlobalConfig, image string, scaler *mvtxscaling.Scaler, logger *zap.SugaredLogger, recorder record.EventRecorder) reconcile.Reconciler { + return &monoVertexReconciler{client: client, scheme: scheme, config: config, image: image, scaler: scaler, logger: logger, recorder: recorder} } func (mr *monoVertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -87,13 +89,28 @@ func (mr *monoVertexReconciler) Reconcile(ctx context.Context, req ctrl.Request) // reconcile does the real logic. func (mr *monoVertexReconciler) reconcile(ctx context.Context, monoVtx *dfv1.MonoVertex) (ctrl.Result, error) { log := logging.FromContext(ctx) + mVtxKey := mvtxscaling.KeyOfMonoVertex(*monoVtx) if !monoVtx.DeletionTimestamp.IsZero() { log.Info("Deleting mono vertex") + mr.scaler.StopWatching(mVtxKey) + // Clean up metrics + _ = reconciler.MonoVertexHealth.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name) + _ = reconciler.MonoVertexDisiredReplicas.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name) + _ = reconciler.MonoVertexCurrentReplicas.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name) return ctrl.Result{}, nil } - monoVtx.Status.SetObservedGeneration(monoVtx.Generation) + // Set metrics + defer func() { + if monoVtx.Status.IsHealthy() { + reconciler.MonoVertexHealth.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(1) + } else { + reconciler.MonoVertexHealth.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(0) + } + }() + monoVtx.Status.SetObservedGeneration(monoVtx.Generation) + mr.scaler.StartWatching(mVtxKey) // TODO: handle lifecycle changes // Regular mono vertex change @@ -137,13 +154,19 @@ func (mr *monoVertexReconciler) reconcileNonLifecycleChanges(ctx context.Context } func (mr *monoVertexReconciler) reconcilePods(ctx context.Context, monoVtx *dfv1.MonoVertex) error { + desiredReplicas := monoVtx.GetReplicas() + // Set metrics + defer func() { + reconciler.MonoVertexDisiredReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(desiredReplicas)) + reconciler.MonoVertexCurrentReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(monoVtx.Status.Replicas)) + }() + log := logging.FromContext(ctx) existingPods, err := mr.findExistingPods(ctx, monoVtx) if err != nil { mr.markDeploymentFailedAndLogEvent(monoVtx, false, log, "FindExistingPodFailed", err.Error(), "Failed to find existing mono vertex pods", zap.Error(err)) return err } - desiredReplicas := monoVtx.GetReplicas() for replica := 0; replica < desiredReplicas; replica++ { podSpec, err := mr.buildPodSpec(monoVtx) if err != nil { diff --git a/pkg/reconciler/monovertex/scaling/doc.go b/pkg/reconciler/monovertex/scaling/doc.go new file mode 100644 index 0000000000..9a3d2ba933 --- /dev/null +++ b/pkg/reconciler/monovertex/scaling/doc.go @@ -0,0 +1,25 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package scaling provides the autoscaling capability for MonoVertex objects. +// +// A workqueue is implemented in this package to watch monovertices in the cluster, +// calculate the desired replica number for each of them periodically, and +// patch the MonoVertex spec. +// +// Function StartWatching() and StopWatching() are also provided in the package, +// so that monovertices can be added into and removed from the workqueue. +package scaling diff --git a/pkg/reconciler/monovertex/scaling/options.go b/pkg/reconciler/monovertex/scaling/options.go new file mode 100644 index 0000000000..10ed212181 --- /dev/null +++ b/pkg/reconciler/monovertex/scaling/options.go @@ -0,0 +1,57 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +type options struct { + // Number of workers working on autoscaling. + workers int + // Time in milliseconds, each element in the work queue will be picked up in an interval of this period of time. + taskInterval int + // size of the daemon clients cache. + clientsCacheSize int +} + +type Option func(*options) + +func defaultOptions() *options { + return &options{ + workers: 20, + taskInterval: 30000, + clientsCacheSize: 500, + } +} + +// WithWorkers sets the number of workers working on autoscaling. +func WithWorkers(n int) Option { + return func(o *options) { + o.workers = n + } +} + +// WithTaskInterval sets the interval of picking up a task from the work queue. +func WithTaskInterval(n int) Option { + return func(o *options) { + o.taskInterval = n + } +} + +// WithClientsCacheSize sets the size of the daemon clients cache. +func WithClientsCacheSize(n int) Option { + return func(o *options) { + o.clientsCacheSize = n + } +} diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go new file mode 100644 index 0000000000..15fa518e03 --- /dev/null +++ b/pkg/reconciler/monovertex/scaling/scaling.go @@ -0,0 +1,366 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling + +import ( + "container/list" + "context" + "encoding/json" + "fmt" + "math" + "strings" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "go.uber.org/zap" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + "github.com/numaproj/numaflow/pkg/isb" + mvtxdaemonclient "github.com/numaproj/numaflow/pkg/mvtxdaemon/client" + "github.com/numaproj/numaflow/pkg/shared/logging" +) + +type Scaler struct { + client client.Client + monoVtxMap map[string]*list.Element + // List of the mono vertex namespaced name, format is "namespace/name" + monoVtxList *list.List + lock *sync.RWMutex + options *options + // Cache to store the vertex metrics such as pending message number + monoVtxMetricsCache *lru.Cache[string, int64] + mvtxDaemonClientsCache *lru.Cache[string, mvtxdaemonclient.MonoVertexDaemonClient] +} + +// NewScaler returns a Scaler instance. +func NewScaler(client client.Client, opts ...Option) *Scaler { + scalerOpts := defaultOptions() + for _, opt := range opts { + if opt != nil { + opt(scalerOpts) + } + } + s := &Scaler{ + client: client, + options: scalerOpts, + monoVtxMap: make(map[string]*list.Element), + monoVtxList: list.New(), + lock: new(sync.RWMutex), + } + // cache the clients + s.mvtxDaemonClientsCache, _ = lru.NewWithEvict[string, mvtxdaemonclient.MonoVertexDaemonClient](s.options.clientsCacheSize, func(key string, value mvtxdaemonclient.MonoVertexDaemonClient) { + _ = value.Close() + }) + monoVtxMetricsCache, _ := lru.New[string, int64](10000) + s.monoVtxMetricsCache = monoVtxMetricsCache + return s +} + +// Contains returns if the Scaler contains the key. +func (s *Scaler) Contains(key string) bool { + s.lock.RLock() + defer s.lock.RUnlock() + _, ok := s.monoVtxMap[key] + return ok +} + +// Length returns how many vertices are being watched for autoscaling +func (s *Scaler) Length() int { + s.lock.RLock() + defer s.lock.RUnlock() + return s.monoVtxList.Len() +} + +// StartWatching put a key (namespace/name) into the Scaler +func (s *Scaler) StartWatching(key string) { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.monoVtxMap[key]; !ok { + s.monoVtxMap[key] = s.monoVtxList.PushBack(key) + } +} + +// StopWatching stops autoscaling on the key (namespace/name) +func (s *Scaler) StopWatching(key string) { + s.lock.Lock() + defer s.lock.Unlock() + if e, ok := s.monoVtxMap[key]; ok { + _ = s.monoVtxList.Remove(e) + delete(s.monoVtxMap, key) + } +} + +// Function scale() defines each of the worker's job. +// It waits for keys in the channel, and starts a scaling job +func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) { + log := logging.FromContext(ctx) + log.Infof("Started MonoVertex autoscaling worker %v", id) + for { + select { + case <-ctx.Done(): + log.Infof("Stopped MonoVertex autoscaling worker %v", id) + return + case key := <-keyCh: + if err := s.scaleOneMonoVertex(ctx, key, id); err != nil { + log.Errorw("Failed to scale a MonoVertex", zap.String("monoVtx", key), zap.Error(err)) + } + } + } +} + +// scaleOneMonoVertex implements the detailed logic of scaling up/down a MonoVertex. +// +// desiredReplicas = currentReplicas * pending / (targetProcessingTime * rate) +func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int) error { + log := logging.FromContext(ctx).With("worker", fmt.Sprint(worker)).With("monoVtxKey", key) + log.Debugf("Working on key: %s.", key) + strs := strings.Split(key, "/") + if len(strs) != 2 { + return fmt.Errorf("invalid key %q", key) + } + namespace := strs[0] + monoVtxName := strs[1] + monoVtx := &dfv1.MonoVertex{} + if err := s.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: monoVtxName}, monoVtx); err != nil { + if apierrors.IsNotFound(err) { + s.StopWatching(key) + log.Info("No corresponding MonoVertex found, stopped watching.") + return nil + } + return fmt.Errorf("failed to query MonoVertex object of key %q, %w", key, err) + } + if !monoVtx.GetDeletionTimestamp().IsZero() { + s.StopWatching(key) + log.Debug("MonoVertex being deleted.") + return nil + } + if !monoVtx.Scalable() { + s.StopWatching(key) // Remove it in case it's watched. + return nil + } + secondsSinceLastScale := time.Since(monoVtx.Status.LastScaledAt.Time).Seconds() + scaleDownCooldown := float64(monoVtx.Spec.Scale.GetScaleDownCooldownSeconds()) + scaleUpCooldown := float64(monoVtx.Spec.Scale.GetScaleUpCooldownSeconds()) + if secondsSinceLastScale < scaleDownCooldown && secondsSinceLastScale < scaleUpCooldown { + // Skip scaling without needing further calculation + log.Infof("Cooldown period, skip scaling.") + return nil + } + if monoVtx.Status.Phase != dfv1.MonoVertexPhaseRunning { + log.Infof("MonoVertex not in Running phase, skip scaling.") + return nil + } + // TODO: lifecycle + // if monoVtx.Spec.Lifecycle.GetDesiredPhase() != dfv1.MonoVertexPhaseRunning { + // log.Info("MonoVertex is pausing, skip scaling.") + // return nil + // } + if int(monoVtx.Status.Replicas) != monoVtx.GetReplicas() { + log.Infof("MonoVertex %s might be under processing, replicas mismatch, skip scaling.", monoVtx.Name) + return nil + } + + var err error + daemonClient, _ := s.mvtxDaemonClientsCache.Get(monoVtx.GetDaemonServiceURL()) + if daemonClient == nil { + daemonClient, err = mvtxdaemonclient.NewGRPCClient(monoVtx.GetDaemonServiceURL()) + if err != nil { + return fmt.Errorf("failed to get daemon service client for MonoVertex %s, %w", monoVtx.Name, err) + } + s.mvtxDaemonClientsCache.Add(monoVtx.GetDaemonServiceURL(), daemonClient) + } + + if monoVtx.Status.Replicas == 0 { // Was scaled to 0 + // Periodically wake them up from 0 replicas to 1, to peek for the incoming messages + if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) { + log.Infof("MonoVertex %s has slept %v seconds, scaling up to peek.", monoVtx.Name, secondsSinceLastScale) + return s.patchMonoVertexReplicas(ctx, monoVtx, 1) + } else { + log.Infof("MonoVertex %q has slept %v seconds, hasn't reached zeroReplicaSleepSeconds (%v seconds), skip scaling.", monoVtx.Name, secondsSinceLastScale, monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) + return nil + } + } + + vMetrics, err := daemonClient.GetMonoVertexMetrics(ctx) + if err != nil { + return fmt.Errorf("failed to get metrics of mono vertex key %q, %w", key, err) + } + totalRate := float64(0) + totalPending := int64(0) + rate, existing := vMetrics.ProcessingRates["default"] + // If rate is not available, we skip scaling. + if !existing || rate.GetValue() < 0 { // Rate not available + log.Infof("MonoVertex %s has no rate information, skip scaling.", monoVtxName) + return nil + } else { + totalRate = rate.GetValue() + } + pending, existing := vMetrics.Pendings["default"] + if !existing || pending.GetValue() < 0 || pending.GetValue() == isb.PendingNotAvailable { + // Pending not available, we don't do anything + log.Infof("MonoVertex %s has no pending messages information, skip scaling.", monoVtxName) + return nil + } else { + totalPending = pending.GetValue() + } + + desired := s.desiredReplicas(ctx, monoVtx, totalRate, totalPending) + log.Infof("Calculated desired replica number of MonoVertex %q is: %d.", monoVtx.Name, desired) + max := monoVtx.Spec.Scale.GetMaxReplicas() + min := monoVtx.Spec.Scale.GetMinReplicas() + if desired > max { + desired = max + log.Infof("Calculated desired replica number %d of MonoVertex %q is greater than max, using max %d.", monoVtxName, desired, max) + } + if desired < min { + desired = min + log.Infof("Calculated desired replica number %d of MonoVertex %q is smaller than min, using min %d.", monoVtxName, desired, min) + } + current := int32(monoVtx.GetReplicas()) + if current > max || current < min { // Someone might have manually scaled up/down the MonoVertex + return s.patchMonoVertexReplicas(ctx, monoVtx, desired) + } + maxAllowed := int32(monoVtx.Spec.Scale.GetReplicasPerScale()) + if desired < current { + diff := current - desired + if diff > maxAllowed { + diff = maxAllowed + } + if secondsSinceLastScale < scaleDownCooldown { + log.Infof("Cooldown period for scaling down, skip scaling.") + return nil + } + return s.patchMonoVertexReplicas(ctx, monoVtx, current-diff) // We scale down gradually + } + if desired > current { + diff := desired - current + if diff > maxAllowed { + diff = maxAllowed + } + if secondsSinceLastScale < scaleUpCooldown { + log.Infof("Cooldown period for scaling up, skip scaling.") + return nil + } + return s.patchMonoVertexReplicas(ctx, monoVtx, current+diff) // We scale up gradually + } + return nil +} + +func (s *Scaler) desiredReplicas(_ context.Context, monoVtx *dfv1.MonoVertex, processingRate float64, pending int64) int32 { + // Since pending contains the pending acks, if both totalRate and totalPending are 0, we scale down to 0 + if pending == 0 && processingRate == 0 { + return 0 + } + if processingRate == 0 { // Something is wrong, we don't do anything. + return int32(monoVtx.Status.Replicas) + } + + var desired int32 + // We calculate the time of finishing processing the pending messages, + // and then we know how many replicas are needed to get them done in target seconds. + desired = int32(math.Round(((float64(pending) / processingRate) / float64(monoVtx.Spec.Scale.GetTargetProcessingSeconds())) * float64(monoVtx.Status.Replicas))) + + // we only scale down to zero when the pending and rate are both zero. + if desired == 0 { + desired = 1 + } + if desired > int32(pending) { // For some corner cases, we don't want to scale up to more than pending. + desired = int32(pending) + } + return desired +} + +// Start function starts the autoscaling worker group. +// Each worker keeps picking up scaling tasks (which contains mono vertex keys) to calculate the desired replicas, +// and patch the mono vertex spec with the new replica number if needed. +func (s *Scaler) Start(ctx context.Context) error { + log := logging.FromContext(ctx).Named("mvtx-autoscaler") + log.Info("Starting MonoVertex autoscaler...") + keyCh := make(chan string) + ctx, cancel := context.WithCancel(logging.WithLogger(ctx, log)) + defer cancel() + // Worker group + for i := 1; i <= s.options.workers; i++ { + go s.scale(ctx, i, keyCh) + } + + // Function assign() moves an element in the list from the front to the back, + // and send to the channel so that it can be picked up by a worker. + assign := func() { + s.lock.Lock() + defer s.lock.Unlock() + if s.monoVtxList.Len() == 0 { + return + } + e := s.monoVtxList.Front() + if key, ok := e.Value.(string); ok { + s.monoVtxList.MoveToBack(e) + keyCh <- key + } + } + + // Following for loop keeps calling assign() function to assign scaling tasks to the workers. + // It makes sure each element in the list will be assigned every N milliseconds. + for { + select { + case <-ctx.Done(): + log.Info("Shutting down mono vertex autoscaling job assigner.") + // clear the daemon clients cache + s.mvtxDaemonClientsCache.Purge() + return nil + default: + assign() + } + // Make sure each of the key will be assigned at most every N milliseconds. + time.Sleep(time.Millisecond * time.Duration(func() int { + l := s.Length() + if l == 0 { + return s.options.taskInterval + } + result := s.options.taskInterval / l + if result > 0 { + return result + } + return 1 + }())) + } +} + +func (s *Scaler) patchMonoVertexReplicas(ctx context.Context, monoVtx *dfv1.MonoVertex, desiredReplicas int32) error { + log := logging.FromContext(ctx) + origin := monoVtx.Spec.Replicas + monoVtx.Spec.Replicas = ptr.To[int32](desiredReplicas) + body, err := json.Marshal(monoVtx) + if err != nil { + return fmt.Errorf("failed to marshal MonoVertex object to json, %w", err) + } + if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to patch MonoVertex replicas, %w", err) + } + log.Infow("Auto scaling - mono vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", monoVtx.Namespace), zap.String("vertex", monoVtx.Name)) + return nil +} + +// KeyOfMonoVertex returns the unique key of a MonoVertex +func KeyOfMonoVertex(monoVtx dfv1.MonoVertex) string { + return fmt.Sprintf("%s/%s", monoVtx.Namespace, monoVtx.Name) +} diff --git a/pkg/reconciler/monovertex/scaling/scaling_test.go b/pkg/reconciler/monovertex/scaling/scaling_test.go new file mode 100644 index 0000000000..f0823fe1b5 --- /dev/null +++ b/pkg/reconciler/monovertex/scaling/scaling_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaling diff --git a/pkg/reconciler/vertex/scaling/doc.go b/pkg/reconciler/vertex/scaling/doc.go index a5004a6899..37f3dec1b8 100644 --- a/pkg/reconciler/vertex/scaling/doc.go +++ b/pkg/reconciler/vertex/scaling/doc.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package scaling provides the autoscaling capability for Numaflow. +// Package scaling provides the autoscaling capability for Vertex objects. // // A workqueue is implemented in this package to watch vertices in the cluster, // calculate the desired replica number for each of them periodically, and diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index 6384e12726..01ecd6b826 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -113,11 +113,11 @@ func (s *Scaler) StopWatching(key string) { // It waits for keys in the channel, and starts a scaling job func (s *Scaler) scale(ctx context.Context, id int, keyCh <-chan string) { log := logging.FromContext(ctx) - log.Infof("Started autoscaling worker %v", id) + log.Infof("Started Vertex autoscaling worker %v", id) for { select { case <-ctx.Done(): - log.Infof("Stopped scaling worker %v", id) + log.Infof("Stopped Vertex autoscaling worker %v", id) return case key := <-keyCh: if err := s.scaleOneVertex(ctx, key, id); err != nil { @@ -400,8 +400,8 @@ func (s *Scaler) desiredReplicas(_ context.Context, vertex *dfv1.Vertex, partiti // Each worker keeps picking up scaling tasks (which contains vertex keys) to calculate the desired replicas, // and patch the vertex spec with the new replica number if needed. func (s *Scaler) Start(ctx context.Context) error { - log := logging.FromContext(ctx).Named("autoscaler") - log.Info("Starting autoscaler...") + log := logging.FromContext(ctx).Named("vertex-autoscaler") + log.Info("Starting vertex autoscaler...") keyCh := make(chan string) ctx, cancel := context.WithCancel(logging.WithLogger(ctx, log)) defer cancel()