diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go index e33c93aa7..d624cf7a2 100644 --- a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.go @@ -262,6 +262,53 @@ func (x *GetMonoVertexStatusResponse) GetStatus() *MonoVertexStatus { return nil } +type GetMonoVertexLookbackResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Lookback *wrapperspb.DoubleValue `protobuf:"bytes,1,opt,name=lookback,proto3" json:"lookback,omitempty"` +} + +func (x *GetMonoVertexLookbackResponse) Reset() { + *x = GetMonoVertexLookbackResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetMonoVertexLookbackResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMonoVertexLookbackResponse) ProtoMessage() {} + +func (x *GetMonoVertexLookbackResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMonoVertexLookbackResponse.ProtoReflect.Descriptor instead. +func (*GetMonoVertexLookbackResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescGZIP(), []int{4} +} + +func (x *GetMonoVertexLookbackResponse) GetLookback() *wrapperspb.DoubleValue { + if x != nil { + return x.Lookback + } + return nil +} + var File_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto protoreflect.FileDescriptor var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc = []byte{ @@ -316,27 +363,40 @@ var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc = []byte{ 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x32, 0xfc, 0x01, 0x0a, 0x17, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, - 0x74, 0x65, 0x78, 0x44, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x12, 0x71, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, - 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x1a, 0x28, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x47, 0x65, - 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, - 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, 0x93, - 0x02, 0x11, 0x12, 0x0f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, 0x72, - 0x69, 0x63, 0x73, 0x12, 0x6e, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, - 0x72, 0x74, 0x65, 0x78, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x27, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, - 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x16, 0x82, 0xd3, 0xe4, - 0x93, 0x02, 0x10, 0x12, 0x0e, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, - 0x6c, 0x6f, 0x77, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x74, 0x75, 0x73, 0x22, 0x59, 0x0a, 0x1d, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, + 0x65, 0x72, 0x74, 0x65, 0x78, 0x4c, 0x6f, 0x6f, 0x6b, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x08, 0x6c, 0x6f, 0x6f, 0x6b, 0x62, 0x61, 0x63, + 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x6f, 0x75, 0x62, 0x6c, 0x65, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6c, 0x6f, 0x6f, 0x6b, 0x62, 0x61, 0x63, 0x6b, 0x32, + 0xf2, 0x02, 0x0a, 0x17, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x44, 0x61, + 0x65, 0x6d, 0x6f, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x71, 0x0a, 0x14, 0x47, + 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, + 0x69, 0x63, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x28, 0x2e, 0x6d, 0x76, + 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, + 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x11, 0x12, 0x0f, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x6e, + 0x0a, 0x13, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x27, 0x2e, + 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x6f, + 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x16, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x10, 0x12, 0x0e, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x74, + 0x0a, 0x15, 0x47, 0x65, 0x74, 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4c, + 0x6f, 0x6f, 0x6b, 0x62, 0x61, 0x63, 0x6b, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, + 0x29, 0x2e, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x2e, 0x47, 0x65, 0x74, + 0x4d, 0x6f, 0x6e, 0x6f, 0x56, 0x65, 0x72, 0x74, 0x65, 0x78, 0x4c, 0x6f, 0x6f, 0x6b, 0x62, 0x61, + 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x18, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x12, 0x12, 0x10, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x6f, 0x6f, 0x6b, + 0x62, 0x61, 0x63, 0x6b, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, + 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x76, 0x74, 0x78, 0x64, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -351,34 +411,38 @@ func file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDescData } -var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_goTypes = []any{ - (*MonoVertexMetrics)(nil), // 0: mvtxdaemon.MonoVertexMetrics - (*GetMonoVertexMetricsResponse)(nil), // 1: mvtxdaemon.GetMonoVertexMetricsResponse - (*MonoVertexStatus)(nil), // 2: mvtxdaemon.MonoVertexStatus - (*GetMonoVertexStatusResponse)(nil), // 3: mvtxdaemon.GetMonoVertexStatusResponse - nil, // 4: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry - nil, // 5: mvtxdaemon.MonoVertexMetrics.PendingsEntry - (*wrapperspb.DoubleValue)(nil), // 6: google.protobuf.DoubleValue - (*wrapperspb.Int64Value)(nil), // 7: google.protobuf.Int64Value - (*emptypb.Empty)(nil), // 8: google.protobuf.Empty + (*MonoVertexMetrics)(nil), // 0: mvtxdaemon.MonoVertexMetrics + (*GetMonoVertexMetricsResponse)(nil), // 1: mvtxdaemon.GetMonoVertexMetricsResponse + (*MonoVertexStatus)(nil), // 2: mvtxdaemon.MonoVertexStatus + (*GetMonoVertexStatusResponse)(nil), // 3: mvtxdaemon.GetMonoVertexStatusResponse + (*GetMonoVertexLookbackResponse)(nil), // 4: mvtxdaemon.GetMonoVertexLookbackResponse + nil, // 5: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry + nil, // 6: mvtxdaemon.MonoVertexMetrics.PendingsEntry + (*wrapperspb.DoubleValue)(nil), // 7: google.protobuf.DoubleValue + (*wrapperspb.Int64Value)(nil), // 8: google.protobuf.Int64Value + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_depIdxs = []int32{ - 4, // 0: mvtxdaemon.MonoVertexMetrics.processingRates:type_name -> mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry - 5, // 1: mvtxdaemon.MonoVertexMetrics.pendings:type_name -> mvtxdaemon.MonoVertexMetrics.PendingsEntry - 0, // 2: mvtxdaemon.GetMonoVertexMetricsResponse.metrics:type_name -> mvtxdaemon.MonoVertexMetrics - 2, // 3: mvtxdaemon.GetMonoVertexStatusResponse.status:type_name -> mvtxdaemon.MonoVertexStatus - 6, // 4: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry.value:type_name -> google.protobuf.DoubleValue - 7, // 5: mvtxdaemon.MonoVertexMetrics.PendingsEntry.value:type_name -> google.protobuf.Int64Value - 8, // 6: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:input_type -> google.protobuf.Empty - 8, // 7: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexStatus:input_type -> google.protobuf.Empty - 1, // 8: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:output_type -> mvtxdaemon.GetMonoVertexMetricsResponse - 3, // 9: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexStatus:output_type -> mvtxdaemon.GetMonoVertexStatusResponse - 8, // [8:10] is the sub-list for method output_type - 6, // [6:8] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 5, // 0: mvtxdaemon.MonoVertexMetrics.processingRates:type_name -> mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry + 6, // 1: mvtxdaemon.MonoVertexMetrics.pendings:type_name -> mvtxdaemon.MonoVertexMetrics.PendingsEntry + 0, // 2: mvtxdaemon.GetMonoVertexMetricsResponse.metrics:type_name -> mvtxdaemon.MonoVertexMetrics + 2, // 3: mvtxdaemon.GetMonoVertexStatusResponse.status:type_name -> mvtxdaemon.MonoVertexStatus + 7, // 4: mvtxdaemon.GetMonoVertexLookbackResponse.lookback:type_name -> google.protobuf.DoubleValue + 7, // 5: mvtxdaemon.MonoVertexMetrics.ProcessingRatesEntry.value:type_name -> google.protobuf.DoubleValue + 8, // 6: mvtxdaemon.MonoVertexMetrics.PendingsEntry.value:type_name -> google.protobuf.Int64Value + 9, // 7: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:input_type -> google.protobuf.Empty + 9, // 8: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexStatus:input_type -> google.protobuf.Empty + 9, // 9: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexLookback:input_type -> google.protobuf.Empty + 1, // 10: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexMetrics:output_type -> mvtxdaemon.GetMonoVertexMetricsResponse + 3, // 11: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexStatus:output_type -> mvtxdaemon.GetMonoVertexStatusResponse + 4, // 12: mvtxdaemon.MonoVertexDaemonService.GetMonoVertexLookback:output_type -> mvtxdaemon.GetMonoVertexLookbackResponse + 10, // [10:13] is the sub-list for method output_type + 7, // [7:10] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_init() } @@ -435,6 +499,18 @@ func file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_init() { return nil } } + file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*GetMonoVertexLookbackResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -442,7 +518,7 @@ func file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_mvtxdaemon_mvtxdaemon_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go index 53badc433..7f46d65dc 100644 --- a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.pb.gw.go @@ -68,6 +68,24 @@ func local_request_MonoVertexDaemonService_GetMonoVertexStatus_0(ctx context.Con } +func request_MonoVertexDaemonService_GetMonoVertexLookback_0(ctx context.Context, marshaler runtime.Marshaler, client MonoVertexDaemonServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := client.GetMonoVertexLookback(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_MonoVertexDaemonService_GetMonoVertexLookback_0(ctx context.Context, marshaler runtime.Marshaler, server MonoVertexDaemonServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq emptypb.Empty + var metadata runtime.ServerMetadata + + msg, err := server.GetMonoVertexLookback(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterMonoVertexDaemonServiceHandlerServer registers the http handlers for service MonoVertexDaemonService to "mux". // UnaryRPC :call MonoVertexDaemonServiceServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -124,6 +142,31 @@ func RegisterMonoVertexDaemonServiceHandlerServer(ctx context.Context, mux *runt }) + mux.Handle("GET", pattern_MonoVertexDaemonService_GetMonoVertexLookback_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexLookback", runtime.WithHTTPPathPattern("/api/v1/lookback")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_MonoVertexDaemonService_GetMonoVertexLookback_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_MonoVertexDaemonService_GetMonoVertexLookback_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -209,6 +252,28 @@ func RegisterMonoVertexDaemonServiceHandlerClient(ctx context.Context, mux *runt }) + mux.Handle("GET", pattern_MonoVertexDaemonService_GetMonoVertexLookback_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + var err error + var annotatedContext context.Context + annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexLookback", runtime.WithHTTPPathPattern("/api/v1/lookback")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_MonoVertexDaemonService_GetMonoVertexLookback_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + + forward_MonoVertexDaemonService_GetMonoVertexLookback_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -216,10 +281,14 @@ var ( pattern_MonoVertexDaemonService_GetMonoVertexMetrics_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "v1", "metrics"}, "")) pattern_MonoVertexDaemonService_GetMonoVertexStatus_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "v1", "status"}, "")) + + pattern_MonoVertexDaemonService_GetMonoVertexLookback_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"api", "v1", "lookback"}, "")) ) var ( forward_MonoVertexDaemonService_GetMonoVertexMetrics_0 = runtime.ForwardResponseMessage forward_MonoVertexDaemonService_GetMonoVertexStatus_0 = runtime.ForwardResponseMessage + + forward_MonoVertexDaemonService_GetMonoVertexLookback_0 = runtime.ForwardResponseMessage ) diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto index 21bc215c5..a6dab7164 100644 --- a/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto @@ -48,6 +48,10 @@ message GetMonoVertexStatusResponse { MonoVertexStatus status = 1; } +message GetMonoVertexLookbackResponse { + google.protobuf.DoubleValue lookback = 1; +} + // MonoVertexDaemonService is a grpc service that is used to provide APIs for giving any MonoVertex information. service MonoVertexDaemonService { @@ -59,4 +63,8 @@ service MonoVertexDaemonService { option (google.api.http).get = "/api/v1/status"; }; + rpc GetMonoVertexLookback (google.protobuf.Empty) returns (GetMonoVertexLookbackResponse) { + option (google.api.http).get = "/api/v1/lookback"; + }; + } \ No newline at end of file diff --git a/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go b/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go index 76477c3de..609eae0b9 100644 --- a/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go +++ b/pkg/apis/proto/mvtxdaemon/mvtxdaemon_grpc.pb.go @@ -35,8 +35,9 @@ import ( const _ = grpc.SupportPackageIsVersion8 const ( - MonoVertexDaemonService_GetMonoVertexMetrics_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics" - MonoVertexDaemonService_GetMonoVertexStatus_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexStatus" + MonoVertexDaemonService_GetMonoVertexMetrics_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics" + MonoVertexDaemonService_GetMonoVertexStatus_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexStatus" + MonoVertexDaemonService_GetMonoVertexLookback_FullMethodName = "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexLookback" ) // MonoVertexDaemonServiceClient is the client API for MonoVertexDaemonService service. @@ -47,6 +48,7 @@ const ( type MonoVertexDaemonServiceClient interface { GetMonoVertexMetrics(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexMetricsResponse, error) GetMonoVertexStatus(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexStatusResponse, error) + GetMonoVertexLookback(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexLookbackResponse, error) } type monoVertexDaemonServiceClient struct { @@ -77,6 +79,16 @@ func (c *monoVertexDaemonServiceClient) GetMonoVertexStatus(ctx context.Context, return out, nil } +func (c *monoVertexDaemonServiceClient) GetMonoVertexLookback(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*GetMonoVertexLookbackResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetMonoVertexLookbackResponse) + err := c.cc.Invoke(ctx, MonoVertexDaemonService_GetMonoVertexLookback_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // MonoVertexDaemonServiceServer is the server API for MonoVertexDaemonService service. // All implementations must embed UnimplementedMonoVertexDaemonServiceServer // for forward compatibility @@ -85,6 +97,7 @@ func (c *monoVertexDaemonServiceClient) GetMonoVertexStatus(ctx context.Context, type MonoVertexDaemonServiceServer interface { GetMonoVertexMetrics(context.Context, *emptypb.Empty) (*GetMonoVertexMetricsResponse, error) GetMonoVertexStatus(context.Context, *emptypb.Empty) (*GetMonoVertexStatusResponse, error) + GetMonoVertexLookback(context.Context, *emptypb.Empty) (*GetMonoVertexLookbackResponse, error) mustEmbedUnimplementedMonoVertexDaemonServiceServer() } @@ -98,6 +111,9 @@ func (UnimplementedMonoVertexDaemonServiceServer) GetMonoVertexMetrics(context.C func (UnimplementedMonoVertexDaemonServiceServer) GetMonoVertexStatus(context.Context, *emptypb.Empty) (*GetMonoVertexStatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetMonoVertexStatus not implemented") } +func (UnimplementedMonoVertexDaemonServiceServer) GetMonoVertexLookback(context.Context, *emptypb.Empty) (*GetMonoVertexLookbackResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMonoVertexLookback not implemented") +} func (UnimplementedMonoVertexDaemonServiceServer) mustEmbedUnimplementedMonoVertexDaemonServiceServer() { } @@ -148,6 +164,24 @@ func _MonoVertexDaemonService_GetMonoVertexStatus_Handler(srv interface{}, ctx c return interceptor(ctx, in, info, handler) } +func _MonoVertexDaemonService_GetMonoVertexLookback_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MonoVertexDaemonServiceServer).GetMonoVertexLookback(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: MonoVertexDaemonService_GetMonoVertexLookback_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MonoVertexDaemonServiceServer).GetMonoVertexLookback(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + // MonoVertexDaemonService_ServiceDesc is the grpc.ServiceDesc for MonoVertexDaemonService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -163,6 +197,10 @@ var MonoVertexDaemonService_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetMonoVertexStatus", Handler: _MonoVertexDaemonService_GetMonoVertexStatus_Handler, }, + { + MethodName: "GetMonoVertexLookback", + Handler: _MonoVertexDaemonService_GetMonoVertexLookback_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/apis/proto/mvtxdaemon/mvtxdaemon.proto", diff --git a/pkg/mvtxdaemon/server/service/mvtx_service.go b/pkg/mvtxdaemon/server/service/mvtx_service.go index c52c590ac..f835ce6f7 100644 --- a/pkg/mvtxdaemon/server/service/mvtx_service.go +++ b/pkg/mvtxdaemon/server/service/mvtx_service.go @@ -178,3 +178,10 @@ func (mvs *MonoVertexService) startHealthCheck(ctx context.Context) { } } } + +// GetMonoVertexLookback returns the current lookback of the MonoVertex from the rater +func (mvs *MonoVertexService) GetMonoVertexLookback(ctx context.Context, empty *emptypb.Empty) (*mvtxdaemon.GetMonoVertexLookbackResponse, error) { + return &mvtxdaemon.GetMonoVertexLookbackResponse{ + Lookback: mvs.rater.GetLookBack(), + }, nil +} diff --git a/pkg/mvtxdaemon/server/service/rater/helper.go b/pkg/mvtxdaemon/server/service/rater/helper.go index ce9aac1ed..9f73ce4cd 100644 --- a/pkg/mvtxdaemon/server/service/rater/helper.go +++ b/pkg/mvtxdaemon/server/service/rater/helper.go @@ -49,6 +49,24 @@ func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, p q.Append(tc) } +// UpdateProcessingTime updates the processing time of a batch of messages for a pod at a given time +func UpdateProcessingTime(q *sharedqueue.OverflowQueue[*TimestampedProcessingTime], time int64, podProcessingTime *PodProcessingTime) { + items := q.Items() + + // find the element matching the input timestamp and update it + for _, i := range items { + if i.timestamp == time { + i.Update(podProcessingTime) + return + } + } + + // if we cannot find a matching element, it means we need to add a new timestamped count to the queue + tc := NewTimestampedProcessingTime(time) + tc.Update(podProcessingTime) + q.Append(tc) +} + // CalculateRate calculates the rate of a MonoVertex for a given lookback period. func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64 { counts := q.Items() @@ -104,6 +122,31 @@ func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int { return startIndex } +// findStartIndex finds the index of the first element in the queue that is within the lookback seconds +func findStartIndexPt(lookbackSeconds int64, counts []*TimestampedProcessingTime) int { + n := len(counts) + now := time.Now().Truncate(CountWindow).Unix() + if n < 2 || now-counts[n-2].timestamp > lookbackSeconds { + // if the second last element is already outside the lookback window, we return indexNotFound + return indexNotFound + } + + startIndex := n - 2 + left := 0 + right := n - 2 + lastTimestamp := now - lookbackSeconds + for left <= right { + mid := left + (right-left)/2 + if counts[mid].timestamp >= lastTimestamp { + startIndex = mid + right = mid - 1 + } else { + left = mid + 1 + } + } + return startIndex +} + // calculatePodDelta calculates the difference between the current and previous pod count snapshots func calculatePodDelta(tc1, tc2 *TimestampedCounts) float64 { delta := float64(0) diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go index ac7422952..24184c4f4 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater.go +++ b/pkg/mvtxdaemon/server/service/rater/rater.go @@ -23,6 +23,7 @@ import ( "time" "github.com/prometheus/common/expfmt" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/net/context" "google.golang.org/protobuf/types/known/wrapperspb" @@ -34,11 +35,13 @@ import ( const CountWindow = time.Second * 10 const monoVtxReadMetricName = "monovtx_read_total" +const MaxLookback = time.Minute * 10 // MonoVtxRatable is the interface for the Rater struct. type MonoVtxRatable interface { Start(ctx context.Context) error GetRates() map[string]*wrapperspb.DoubleValue + GetLookBack() *wrapperspb.DoubleValue } var _ MonoVtxRatable = (*Rater)(nil) @@ -63,11 +66,19 @@ type Rater struct { podTracker *PodTracker // timestampedPodCounts is a queue of timestamped counts for the MonoVertex timestampedPodCounts *sharedqueue.OverflowQueue[*TimestampedCounts] - // userSpecifiedLookBackSeconds is the user-specified lookback seconds for that MonoVertex - userSpecifiedLookBackSeconds int64 + // timestampedPodProcessingTime is a map between vertex name and a queue of timestamped processing times for that vertex + timestampedPodProcessingTime *sharedqueue.OverflowQueue[*TimestampedProcessingTime] + // userSpecifiedLookBackSeconds the current lookback seconds for the monovertex + // this can be updated dynamically, defaults to user-specified value in the spec + userSpecifiedLookBackSeconds *atomic.Float64 options *options } +// GetLookBack returns the current lookback seconds for the MonoVertex. +func (r *Rater) GetLookBack() *wrapperspb.DoubleValue { + return wrapperspb.Double(r.userSpecifiedLookBackSeconds.Load()) +} + // PodReadCount is a struct to maintain count of messages read by a pod of MonoVertex type PodReadCount struct { // pod name of the pod @@ -95,14 +106,15 @@ func NewRater(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...Option) *Rat }, Timeout: time.Second * 1, }, - log: logging.FromContext(ctx).Named("Rater"), - options: defaultOptions(), + log: logging.FromContext(ctx).Named("Rater"), + options: defaultOptions(), + userSpecifiedLookBackSeconds: atomic.NewFloat64(float64(mv.Spec.Scale.GetLookbackSeconds())), } rater.podTracker = NewPodTracker(ctx, mv) // maintain the total counts of the last 30 minutes(1800 seconds) since we support 1m, 5m, 15m lookback seconds. rater.timestampedPodCounts = sharedqueue.New[*TimestampedCounts](int(1800 / CountWindow.Seconds())) - rater.userSpecifiedLookBackSeconds = int64(mv.Spec.Scale.GetLookbackSeconds()) + rater.timestampedPodProcessingTime = sharedqueue.New[*TimestampedProcessingTime](int(1800 / CountWindow.Seconds())) for _, opt := range opts { if opt != nil { @@ -138,17 +150,24 @@ func (r *Rater) monitorOnePod(ctx context.Context, key string, worker int) error return err } var podReadCount *PodReadCount + var processingTime *PodProcessingTime if r.podTracker.IsActive(key) { podReadCount = r.getPodReadCounts(pInfo.podName) if podReadCount == nil { log.Debugf("Failed retrieving total podReadCounts for pod %s", pInfo.podName) } + processingTime = r.getPodProcessingTime(pInfo.podName) + if processingTime == nil { + log.Debugf("Failed retrieving total processingTime for pod %s", pInfo.podName) + } } else { log.Debugf("Pod %s does not exist, updating it with nil...", pInfo.podName) podReadCount = nil + processingTime = nil } now := time.Now().Add(CountWindow).Truncate(CountWindow).Unix() UpdateCount(r.timestampedPodCounts, now, podReadCount) + UpdateProcessingTime(r.timestampedPodProcessingTime, now, processingTime) return nil } @@ -203,7 +222,8 @@ func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue { } func (r *Rater) buildLookbackSecondsMap() map[string]int64 { - lookbackSecondsMap := map[string]int64{"default": r.userSpecifiedLookBackSeconds} + lbValue := r.userSpecifiedLookBackSeconds.Load() + lookbackSecondsMap := map[string]int64{"default": int64(lbValue)} for k, v := range fixedLookbackSeconds { lookbackSecondsMap[k] = v } @@ -223,6 +243,10 @@ func (r *Rater) Start(ctx context.Context) error { } }() + // start the dynamic lookback check which will be + // calculating and updating the lookback period based on the processing time + go r.startDynamicLookBack(ctx) + // Worker group for i := 1; i <= r.options.workers; i++ { go r.monitor(ctx, i, keyCh) @@ -269,3 +293,22 @@ func sleep(ctx context.Context, duration time.Duration) { case <-time.After(duration): } } + +// startDynamicLookBack continuously adjusts ths lookback duration based on the current +// processing time of the MonoVertex system. +func (r *Rater) startDynamicLookBack(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + // Ensure the ticker is stopped to prevent a resource leak. + defer ticker.Stop() + for { + select { + case <-ticker.C: + // The updateDynamicLookbackSecs method is called which adjusts the + // lookback duration based on current conditions. + r.updateDynamicLookbackSecs() + case <-ctx.Done(): + // If the context is canceled or expires exit + return + } + } +} diff --git a/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go b/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go new file mode 100644 index 000000000..f84711290 --- /dev/null +++ b/pkg/mvtxdaemon/server/service/rater/rater_lookback_utils.go @@ -0,0 +1,175 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rater + +import ( + "fmt" + "math" + + "github.com/prometheus/common/expfmt" + + "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" + sharedqueue "github.com/numaproj/numaflow/pkg/shared/queue" +) + +// PodProcessingTime is a struct to maintain the required data +// to calculate the processing time for a batch by a pod +type PodProcessingTime struct { + name string + processingTimeSum float64 + processingTimeCount float64 +} + +func (p *PodProcessingTime) Name() string { + return p.name +} + +func (p *PodProcessingTime) processingTimeValues() (float64, float64) { + return p.processingTimeSum, p.processingTimeCount +} + +// updateDynamicLookbackSecs updates the default lookback period of a vertex based on the processing rate. +// It is intended to optimize the responsiveness of the system to changes based on its +// current load and performance characteristics. +func (r *Rater) updateDynamicLookbackSecs() { + // calculate rates for each look back seconds + vertexName := r.monoVertex.Name + processingTimeSeconds, update := r.CalculateVertexProcessingTime(r.timestampedPodProcessingTime) + if !update { + return + } + r.log.Debugf("Calulcated processingTimeSeconds for mvtx %s : %f ", vertexName, processingTimeSeconds) + // if the current calculated processing time is greater than the lookback Seconds, update it + currentVal := r.userSpecifiedLookBackSeconds.Load() + // round up to the nearest minute, also ensure that while going up and down we have the consistent value for + // a given processingTimeSeconds, then convert back to seconds + roundedProcessingTime := 60 * int(math.Ceil(processingTimeSeconds/60)) + // step up case + if roundedProcessingTime > int(currentVal) { + r.userSpecifiedLookBackSeconds.Store(math.Min(MaxLookback.Seconds(), float64(roundedProcessingTime))) + r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) + } else { + // step down case + // We should not be setting values lower than the lookBackSeconds defined in the spec + roundedProcessingTime = int(math.Max(float64(roundedProcessingTime), float64(r.monoVertex.Spec.Scale.GetLookbackSeconds()))) + if roundedProcessingTime != int(currentVal) { + r.userSpecifiedLookBackSeconds.Store(float64(roundedProcessingTime)) + r.log.Infof("Lookback updated for mvtx %s, old %f new %d", vertexName, currentVal, roundedProcessingTime) + } + } +} + +func (r *Rater) CalculateVertexProcessingTime(q *sharedqueue.OverflowQueue[*TimestampedProcessingTime]) (float64, bool) { + counts := q.Items() + currentLookbackSecs := r.userSpecifiedLookBackSeconds.Load() + // If we do not have enough data points, lets send back the default from the vertex + // or if we are gating at the max lookback + if len(counts) <= 1 || (currentLookbackSecs >= MaxLookback.Seconds()) { + return currentLookbackSecs, false + } + // Checking for 3 look back periods right now -> this will be gated to 30 min + // as that is the max data that we store + startIndex := findStartIndexPt(int64(currentLookbackSecs*3), counts) + // we consider the last but one element as the end index because the last element might be incomplete + // we can be sure that the last but one element in the queue is complete. + endIndex := len(counts) - 2 + // If we do not have data from previous timeline, then return the current + // lookback time itself + if startIndex == indexNotFound { + return currentLookbackSecs, false + } + + // time diff in seconds. + timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp + if timeDiff == 0 { + // if the time difference is 0, we return 0 to avoid division by 0 + // this should not happen in practice because we are using a 10s interval + return currentLookbackSecs, false + } + + cumulativeTime := make(map[string]float64) + count := make(map[string]int) + + // Iterate over the range of items and aggregate it per pod level + for i := startIndex; i <= endIndex; i++ { + item := counts[i] + if item == nil { + continue + } + vals := item.PodProcessingTimeSnapshot() + for pod, ptTime := range vals { + cumulativeTime[pod] += ptTime + count[pod]++ + } + } + + // TODO(mvtx-adapt): Check with EWMA if that helps better + maxAverage := 0.0 + // Calculate averages and find the maximum between the pods + for pod, totalTime := range cumulativeTime { + if totalCnt := count[pod]; totalCnt > 0 { + average := totalTime / float64(totalCnt) + if average > maxAverage { + maxAverage = average + } + } + + } + // Return the maximum average processing time + return maxAverage, true +} + +// getPodProcessingTime is a utility function to get the metrics from the pod +func (r *Rater) getPodProcessingTime(podName string) *PodProcessingTime { + processingTimeCountMetric := "monovtx_processing_time" + headlessServiceName := r.monoVertex.GetHeadlessServiceName() + // scrape the read total metric from pod metric port + // example for 0th pod: https://simple-mono-vertex-mv-0.simple-mono-vertex-mv-headless.default.svc:2469/metrics + url := fmt.Sprintf("https://%s.%s.%s.svc:%v/metrics", podName, headlessServiceName, r.monoVertex.Namespace, v1alpha1.MonoVertexMetricsPort) + resp, err := r.httpClient.Get(url) + if err != nil { + r.log.Warnf("[Pod name %s]: failed reading the metrics endpoint, the pod might have been scaled down: %v", podName, err.Error()) + return nil + } + defer resp.Body.Close() + + textParser := expfmt.TextParser{} + result, err := textParser.TextToMetricFamilies(resp.Body) + if err != nil { + r.log.Errorf("[Pod name %s]: failed parsing to prometheus metric families, %v", podName, err.Error()) + return nil + } + + var podSum, podCount float64 + if value, ok := result[processingTimeCountMetric]; ok && value != nil && len(value.GetMetric()) > 0 { + metricsList := value.GetMetric() + // Each pod should be emitting only one metric with this name, so we should be able to take the first value + // from the results safely. + // https://github.com/prometheus/client_rust/issues/194 + ele := metricsList[0] + podCount = float64(ele.Histogram.GetSampleCount()) + podSum = ele.Histogram.GetSampleSum() + } else { + r.log.Debugf("[Pod name %s]: Metric %q is unavailable, the pod might haven't started processing data", podName, processingTimeCountMetric) + return nil + } + return &PodProcessingTime{ + name: podName, + processingTimeSum: podSum, + processingTimeCount: podCount, + } +} diff --git a/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go new file mode 100644 index 000000000..96c602695 --- /dev/null +++ b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 The Numaproj Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rater + +import ( + "fmt" + "sync" +) + +// TimestampedProcessingTime track the total count of processed messages for a list of pods at a given timestamp +type TimestampedProcessingTime struct { + // timestamp in seconds is the time when the count is recorded + timestamp int64 + // the key of podProcessingTime represents the pod name, the value represents the batch processing time + podProcessingTime map[string]float64 + lock *sync.RWMutex +} + +func NewTimestampedProcessingTime(t int64) *TimestampedProcessingTime { + return &TimestampedProcessingTime{ + timestamp: t, + podProcessingTime: make(map[string]float64), + lock: new(sync.RWMutex), + } +} + +// Update updates the count of processed messages for a pod +func (tc *TimestampedProcessingTime) Update(podReadCount *PodProcessingTime) { + tc.lock.Lock() + defer tc.lock.Unlock() + if podReadCount == nil { + return + } + sum, count := podReadCount.processingTimeValues() + + // Convert microseconds to seconds + microseconds := sum / count + seconds := microseconds / 1000000.0 + // convert this to nearest seconds + tc.podProcessingTime[podReadCount.Name()] = seconds +} + +// PodProcessingTimeSnapshot returns a copy of podProcessingTime +// it's used to ensure the returned map is not modified by other goroutines +func (tc *TimestampedProcessingTime) PodProcessingTimeSnapshot() map[string]float64 { + tc.lock.RLock() + defer tc.lock.RUnlock() + counts := make(map[string]float64) + for k, v := range tc.podProcessingTime { + counts[k] = v + } + return counts +} + +// String returns a string representation of the TimestampedProcessingTime +// it's used for debugging purpose +func (tc *TimestampedProcessingTime) String() string { + tc.lock.RLock() + defer tc.lock.RUnlock() + return fmt.Sprintf("{timestamp: %d, podProcessingTime: %v}", tc.timestamp, tc.podProcessingTime) +} diff --git a/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts_test.go b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts_test.go new file mode 100644 index 000000000..3694f392e --- /dev/null +++ b/pkg/mvtxdaemon/server/service/rater/timestamped_processing_counts_test.go @@ -0,0 +1,89 @@ +package rater + +import ( + "fmt" + "reflect" + "testing" + "time" +) + +// Test the constructor function +func TestNewTimestampedProcessingTime(t *testing.T) { + currentTime := time.Now().Unix() + tpt := NewTimestampedProcessingTime(currentTime) + + if tpt.timestamp != currentTime { + t.Errorf("Expected timestamp %v, got %v", currentTime, tpt.timestamp) + } + + if len(tpt.podProcessingTime) != 0 { + t.Errorf("Expected empty podProcessingTime map, got %v", tpt.podProcessingTime) + } + + if tpt.lock == nil { + t.Errorf("Expected non-nil lock") + } +} + +// Test the Update method +func TestUpdate(t *testing.T) { + tpt := NewTimestampedProcessingTime(time.Now().Unix()) + podName := "Pod1" + // Creating a new PodProcessingTime + newPod := &PodProcessingTime{ + name: podName, + processingTimeSum: 2400000, + processingTimeCount: 2, + } + + tpt.Update(newPod) + + if len(tpt.podProcessingTime) != 1 { + t.Fatalf("Expected exactly 1 entry in podProcessingTime, got %v", len(tpt.podProcessingTime)) + } + + expectedProcessingTime := 1.2 // ( (2400000/ 2 ) / 1000000 + if tpt.podProcessingTime[podName] != expectedProcessingTime { + t.Errorf("Expected processing time %v, got %v", expectedProcessingTime, tpt.podProcessingTime[podName]) + } + + // Test updating with nil should not change anything + beforeUpdate := tpt.PodProcessingTimeSnapshot() + tpt.Update(nil) + afterUpdate := tpt.PodProcessingTimeSnapshot() + + if len(beforeUpdate) != len(afterUpdate) { + t.Fatalf("Update with nil changed the contents of the map") + } +} + +// Test the PodProcessingTimeSnapshot method +func TestPodProcessingTimeSnapshot(t *testing.T) { + tpt := NewTimestampedProcessingTime(time.Now().Unix()) + tpt.podProcessingTime["Pod1"] = 1.5 + tpt.podProcessingTime["Pod2"] = 2.5 + + snapshot := tpt.PodProcessingTimeSnapshot() + + if !reflect.DeepEqual(snapshot, tpt.podProcessingTime) { + t.Errorf("Snapshot does not match the original map") + } + + // Test that modifying the snapshot does not affect the original + snapshot["Pod1"] = 10.0 + if tpt.podProcessingTime["Pod1"] == 10.0 { + t.Errorf("Modifying snapshot affected the original map") + } +} + +// Test the String method +func TestString(t *testing.T) { + tpt := NewTimestampedProcessingTime(time.Now().Unix()) + tpt.podProcessingTime["Pod1"] = 1.5 + + expected := fmt.Sprintf("{timestamp: %d, podProcessingTime: map[Pod1:%v]}", tpt.timestamp, 1.5) + result := tpt.String() + if result != expected { + t.Errorf("String() output is incorrect. Expected %v, got %v", expected, result) + } +} diff --git a/rust/Cargo.lock b/rust/Cargo.lock index beec59aa4..416d43c71 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -80,7 +80,7 @@ dependencies = [ "thiserror 1.0.69", "time", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tracing", "tryhard", "url", @@ -114,7 +114,7 @@ dependencies = [ "thiserror 1.0.69", "time", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-util", "tokio-websockets", "tracing", @@ -278,7 +278,7 @@ dependencies = [ "rustls-pemfile 2.2.0", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower 0.4.13", "tower-service", ] @@ -705,6 +705,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1074,7 +1089,7 @@ dependencies = [ "pin-project-lite", "rustls-native-certs 0.7.3", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower-service", ] @@ -1107,7 +1122,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower-service", "webpki-roots 0.26.7", ] @@ -1125,6 +1140,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.31", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -1608,6 +1636,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nkeys" version = "0.4.4" @@ -1756,8 +1801,12 @@ dependencies = [ "bytes", "chrono", "futures", + "http-body-util", + "hyper 1.5.1", + "hyper-rustls 0.27.3", "hyper-util", "kube", + "log", "numaflow 0.2.1", "numaflow-models", "numaflow-pb", @@ -1771,6 +1820,7 @@ dependencies = [ "pulsar", "rand", "rcgen", + "reqwest 0.11.27", "rustls 0.23.19", "semver", "serde", @@ -1779,6 +1829,7 @@ dependencies = [ "tempfile", "thiserror 2.0.3", "tokio", + "tokio-rustls 0.26.1", "tokio-stream", "tokio-util", "tonic", @@ -1842,12 +1893,50 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "openssl" +version = "0.10.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2025,6 +2114,12 @@ dependencies = [ "spki", ] +[[package]] +name = "pkg-config" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" + [[package]] name = "portable-atomic" version = "1.10.0" @@ -2434,11 +2529,13 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.31", "hyper-rustls 0.24.2", + "hyper-tls", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2450,6 +2547,7 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -2492,7 +2590,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.2", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower-service", "url", "wasm-bindgen", @@ -3227,6 +3325,16 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-retry" version = "0.3.0" @@ -3250,12 +3358,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +checksum = "5f6d0975eaace0cf0fcadee4e4aaa5da15b5c079146f2cffb67c113be122bf37" dependencies = [ "rustls 0.23.19", - "rustls-pki-types", "tokio", ] @@ -3300,7 +3407,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-util", ] @@ -3325,8 +3432,10 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.13.3", + "rustls-pemfile 2.2.0", "socket2", "tokio", + "tokio-rustls 0.26.1", "tokio-stream", "tower 0.4.13", "tower-layer", @@ -3594,6 +3703,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index 38cabb704..b5386bc33 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -21,7 +21,7 @@ serving.workspace = true backoff.workspace = true axum.workspace = true axum-server.workspace = true -tonic = "0.12.3" +tonic = { version = "0.12.3", features = ["tls"] } bytes = "1.7.1" thiserror = "2.0.3" tokio-util = "0.7.11" @@ -30,12 +30,11 @@ prost = "0.13.2" prost-types = "0.13.1" chrono = "0.4.31" base64 = "0.22.1" -hyper-util = "0.1.6" tower = "0.4.13" serde_json = "1.0.122" trait-variant = "0.1.2" rcgen = "0.13.1" -rustls = { version = "0.23.12", features = ["aws_lc_rs"] } +rustls = { version = "0.23.19", features = ["aws_lc_rs", "ring"] } serde = { version = "1.0.204", features = ["derive"] } semver = "1.0" pep440_rs = "0.6.6" @@ -46,6 +45,13 @@ futures = "0.3.30" pin-project = "1.1.5" rand = "0.8.5" async-nats = "0.38.0" +tokio-rustls = "0.26.1" +reqwest = "0.11.27" +hyper-util = "0.1.10" +hyper = "1.5.1" +hyper-rustls = { version = "0.27.3", default-features = false, features = ["http2"] } +http-body-util = "0.1.2" +log = "0.4.22" [dev-dependencies] tempfile = "3.11.0" diff --git a/rust/numaflow-core/src/config/monovertex.rs b/rust/numaflow-core/src/config/monovertex.rs index 75f5a8cb9..c1278a7f1 100644 --- a/rust/numaflow-core/src/config/monovertex.rs +++ b/rust/numaflow-core/src/config/monovertex.rs @@ -3,9 +3,10 @@ use std::time::Duration; use base64::prelude::BASE64_STANDARD; use base64::Engine; -use numaflow_models::models::MonoVertex; use serde_json::from_slice; +use numaflow_models::models::MonoVertex; + use crate::config::components::metrics::MetricsConfig; use crate::config::components::sink::SinkConfig; use crate::config::components::source::{GeneratorConfig, SourceConfig}; @@ -21,6 +22,7 @@ use crate::Result; const DEFAULT_BATCH_SIZE: u64 = 500; const DEFAULT_TIMEOUT_IN_MS: u32 = 1000; const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120; +const DAEMON_SERVICE_PORT: i16 = 4327; #[derive(Debug, Clone, PartialEq)] pub(crate) struct MonovertexConfig { @@ -33,6 +35,7 @@ pub(crate) struct MonovertexConfig { pub(crate) transformer_config: Option, pub(crate) fb_sink_config: Option, pub(crate) metrics_config: MetricsConfig, + pub(crate) daemon_server_address: String, } impl Default for MonovertexConfig { @@ -53,6 +56,7 @@ impl Default for MonovertexConfig { transformer_config: None, fb_sink_config: None, metrics_config: MetricsConfig::default(), + daemon_server_address: "".to_string(), } } } @@ -143,12 +147,22 @@ impl MonovertexConfig { .and_then(|scale| scale.lookback_seconds.map(|x| x as u16)) .unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS); + let mono_vertex_namespace = mono_vertex_obj + .metadata + .as_ref() + .and_then(|metadata| metadata.namespace.clone()) + .unwrap_or("default".to_string()); + Ok(MonovertexConfig { - name: mono_vertex_name, + name: mono_vertex_name.clone(), replica: *get_vertex_replica(), batch_size: batch_size as usize, read_timeout: Duration::from_millis(timeout_in_ms as u64), metrics_config: MetricsConfig::with_lookback_window_in_secs(look_back_window), + daemon_server_address: get_daemon_server_address( + mono_vertex_name, + mono_vertex_namespace, + ), source_config, sink_config, transformer_config, @@ -157,6 +171,15 @@ impl MonovertexConfig { } } +fn get_daemon_server_address(mvtx_name: String, mvtx_namespace: String) -> String { + format!( + "{}-mv-daemon-svc.{}.svc:{}", + mvtx_name.to_lowercase(), + mvtx_namespace.to_lowercase(), + DAEMON_SERVICE_PORT + ) +} + #[cfg(test)] mod tests { use base64::prelude::BASE64_STANDARD; @@ -376,4 +399,91 @@ mod tests { ); } } + + #[test] + fn test_load_with_lookback_window() { + let valid_config = r#" + { + "metadata": { + "name": "test_vertex" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + }, + "sink": { + "log": {} + }, + "scale": { + "lookbackSeconds": 300 + } + } + } + "#; + let encoded_invalid_config = BASE64_STANDARD.encode(valid_config); + let spec = encoded_invalid_config.as_str(); + + let config = MonovertexConfig::load(spec.to_string()).unwrap(); + + assert_eq!(config.name, "test_vertex"); + assert_eq!(config.metrics_config.lookback_window_in_secs, 300); + } + + #[test] + fn test_daeomon_server_address() { + let mvtx_name = "test_vertex".to_string(); + let mvtx_namespace = "test_namespace".to_string(); + let valid_config = r#" + { + "metadata": { + "name": "test_vertex", + "namespace": "test_namespace" + }, + "spec": { + "limits": { + "readBatchSize": 1000, + "readTimeout": "2s" + }, + "source": { + "udsource": { + "container": { + "image": "xxxxxxx", + "resources": {} + } + } + }, + "sink": { + "log": {} + }, + "scale": { + "lookbackSeconds": 300 + } + } + } + "#; + let encoded_invalid_config = BASE64_STANDARD.encode(valid_config); + let spec = encoded_invalid_config.as_str(); + + let config = MonovertexConfig::load(spec.to_string()).unwrap(); + let daemon_server_address = config.daemon_server_address; + + assert_eq!( + daemon_server_address, + format!( + "{}-mv-daemon-svc.{}.svc:{}", + mvtx_name.to_lowercase(), + mvtx_namespace.to_lowercase(), + 4327 + ) + ); + } } diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index d65380f8d..61b4ffcc0 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -4,11 +4,10 @@ use tokio_util::sync::CancellationToken; use tracing::{error, info}; use crate::config::{config, CustomResourceType}; +pub(crate) use crate::error::{Error, Result}; /// Custom Error handling. mod error; -pub(crate) use crate::error::{Error, Result}; - /// [MonoVertex] is a simplified version of the [Pipeline] spec which is ideal for high TPS, low latency /// use-cases which do not require [ISB]. /// @@ -55,6 +54,10 @@ mod tracker; mod mapper; pub async fn run() -> Result<()> { + rustls::crypto::ring::default_provider() + .install_default() + .expect("Failed to install rustls crypto provider"); + let cln_token = CancellationToken::new(); let shutdown_cln_token = cln_token.clone(); diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index fa79e457b..f9dea690e 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -1,8 +1,9 @@ -use std::collections::BTreeMap; -use std::iter; +use std::collections::{BTreeMap, HashMap}; +use std::io::ErrorKind; use std::net::SocketAddr; use std::sync::{Arc, OnceLock}; use std::time::Duration; +use std::{env, iter}; use axum::body::Body; use axum::extract::State; @@ -10,10 +11,6 @@ use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use axum::{routing::get, Router}; use axum_server::tls_rustls::RustlsConfig; -use numaflow_pb::clients::map::map_client::MapClient; -use numaflow_pb::clients::sink::sink_client::SinkClient; -use numaflow_pb::clients::source::source_client::SourceClient; -use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; use prometheus_client::encoding::text::encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -26,9 +23,16 @@ use tokio::task::JoinHandle; use tokio::time; use tonic::transport::Channel; use tonic::Request; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; + +use numaflow_pb::clients::map::map_client::MapClient; +use numaflow_pb::clients::mvtxdaemon::mono_vertex_daemon_service_client::MonoVertexDaemonServiceClient; +use numaflow_pb::clients::sink::sink_client::SinkClient; +use numaflow_pb::clients::source::source_client::SourceClient; +use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; use crate::config::{get_pipeline_name, get_vertex_name, get_vertex_replica}; +use crate::shared::insecure_tls::HTTPSClient; use crate::source::Source; use crate::Error; @@ -784,7 +788,11 @@ impl PendingReader { /// - Another to periodically expose the pending metrics. /// /// Dropping the PendingReaderTasks will abort the background tasks. - pub async fn start(&self, is_mono_vertex: bool) -> PendingReaderTasks { + pub async fn start( + &self, + is_mono_vertex: bool, + daemon_server_address: String, + ) -> PendingReaderTasks { let pending_reader = self.lag_reader.clone(); let lag_checking_interval = self.lag_checking_interval; let refresh_interval = self.refresh_interval; @@ -802,6 +810,7 @@ impl PendingReader { refresh_interval, pending_stats, lookback_seconds, + daemon_server_address, ) .await; }); @@ -863,6 +872,7 @@ async fn expose_pending_metrics( refresh_interval: Duration, pending_stats: Arc>>, lookback_seconds: u16, + daemon_server_address: String, ) { let mut ticker = time::interval(refresh_interval); @@ -870,15 +880,49 @@ async fn expose_pending_metrics( // string concat is more efficient? let mut pending_info: BTreeMap<&str, i64> = BTreeMap::new(); - let lookback_seconds_map: [(&str, u16); 4] = [ + let mut lookback_seconds_map: [(&str, u16); 4] = [ ("1m", 60), ("default", lookback_seconds), ("5m", 300), ("15m", 900), ]; + let mut daemon_client: Option> = None; + loop { ticker.tick().await; + + // Update the lookback seconds from the daemon server + // Currently only monovertex is supported + if !daemon_server_address.is_empty() && is_mono_vertex { + // only create the client if not already created + if daemon_client.is_none() { + daemon_client = create_mvtx_daemon_client(daemon_server_address.clone()).await; + } + if let Some(ref mut client) = daemon_client { + let res = client.get_mono_vertex_lookback(Request::new(())).await; + if let Ok(response) = res { + let lookback = response.into_inner().lookback.unwrap() as u16; + // Update the lookback seconds if it has changed + if lookback != lookback_seconds_map[1].1 { + lookback_seconds_map[1] = ("default", lookback); + info!("Updated lookback seconds to {}", lookback); + } + } else { + // Trigger reinitialization of client in the next iteration + // to handle any transient errors + warn!( + "Error fetching lookback from MonoVertex daemon, will try to reconnect. {:?}", + res.err() + ); + daemon_client = None; + } + } else { + warn!("Unable to establish client connection. Trying again in the next loop iteration."); + daemon_client = None; + } + } + for (label, seconds) in lookback_seconds_map { let pending = calculate_pending(seconds as i64, &pending_stats).await; if pending != -1 { @@ -934,6 +978,40 @@ async fn calculate_pending( result } +async fn create_mvtx_daemon_client( + daemon_server_address: String, +) -> Option> { + let https_client = crate::shared::insecure_tls::new_https_client().ok()?; + let addr = tokio::net::lookup_host(daemon_server_address) + .await + .ok()? + .find_map(|socket_addr| match socket_addr { + std::net::SocketAddr::V4(ipv4) => Some(ipv4.to_string()), + _ => None, + })?; + + match tokio::net::TcpStream::connect(&addr).await { + Ok(_) => { + let uri = hyper::Uri::builder() + .authority(addr) + .scheme("https") + .path_and_query("/") + .build() + .unwrap(); + + let client = MonoVertexDaemonServiceClient::with_origin(https_client, uri); + Some(client) + } + Err(err) => { + warn!( + "Failed to connect to Monovertex daemon server at {}: {}", + addr, err + ); + None + } + } +} + #[cfg(test)] mod tests { use std::net::SocketAddr; @@ -943,9 +1021,10 @@ mod tests { use numaflow::{sink, source, sourcetransform}; use tokio::sync::mpsc::Sender; - use super::*; use crate::shared::grpc::create_rpc_channel; + use super::*; + struct SimpleSource; #[tonic::async_trait] impl source::Sourcer for SimpleSource { @@ -1130,8 +1209,14 @@ mod tests { tokio::spawn({ let pending_stats = Arc::clone(&pending_stats); async move { - expose_pending_metrics(true, refresh_interval, pending_stats, lookback_seconds) - .await; + expose_pending_metrics( + true, + refresh_interval, + pending_stats, + lookback_seconds, + "".to_string(), + ) + .await; } }); // We use tokio::time::interval() as the ticker in the expose_pending_metrics() function. diff --git a/rust/numaflow-core/src/monovertex.rs b/rust/numaflow-core/src/monovertex.rs index 1518a3c9f..a197717ba 100644 --- a/rust/numaflow-core/src/monovertex.rs +++ b/rust/numaflow-core/src/monovertex.rs @@ -1,7 +1,8 @@ -use forwarder::ForwarderBuilder; use tokio_util::sync::CancellationToken; use tracing::info; +use forwarder::ForwarderBuilder; + use crate::config::is_mono_vertex; use crate::config::monovertex::MonovertexConfig; use crate::error::{self}; @@ -83,7 +84,9 @@ async fn start( // start the pending reader to publish pending metrics let pending_reader = shared::metrics::create_pending_reader(&mvtx_config.metrics_config, source.clone()).await; - let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await; + let _pending_reader_handle = pending_reader + .start(is_mono_vertex(), mvtx_config.daemon_server_address.clone()) + .await; let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token); diff --git a/rust/numaflow-core/src/shared.rs b/rust/numaflow-core/src/shared.rs index 3d8461b17..af7e51972 100644 --- a/rust/numaflow-core/src/shared.rs +++ b/rust/numaflow-core/src/shared.rs @@ -13,3 +13,7 @@ pub(crate) mod create_components; /// Shared methods for forwarding messages. pub(crate) mod forward; + +/// Provides a hyper HTTPS connector with custom server certificate validation implementation. +/// It can be used to create gRPC clients where the gRPC server uses self-signed certificates. +pub(crate) mod insecure_tls; diff --git a/rust/numaflow-core/src/shared/insecure_tls.rs b/rust/numaflow-core/src/shared/insecure_tls.rs new file mode 100644 index 000000000..9faf6234e --- /dev/null +++ b/rust/numaflow-core/src/shared/insecure_tls.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use bytes::Bytes; +use hyper_rustls::HttpsConnector; +use hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, +}; +use rustls::{self, pki_types::CertificateDer, ClientConfig}; +use tonic::Status; + +#[derive(Debug)] +struct SkipServerVerification; + +// TLS server certificate verifier to accept self-signed certs when using rustls. +// The rustls does not provide a direct option to equivalent to Golang's `tls.Config.InsecureSkipVerify`. +impl SkipServerVerification { + fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::client::danger::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &rustls::DigitallySignedStruct, + ) -> Result { + Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + rustls::SignatureScheme::RSA_PKCS1_SHA1, + rustls::SignatureScheme::ECDSA_SHA1_Legacy, + rustls::SignatureScheme::RSA_PKCS1_SHA256, + rustls::SignatureScheme::ECDSA_NISTP256_SHA256, + rustls::SignatureScheme::RSA_PKCS1_SHA384, + rustls::SignatureScheme::ECDSA_NISTP384_SHA384, + rustls::SignatureScheme::RSA_PKCS1_SHA512, + rustls::SignatureScheme::ECDSA_NISTP521_SHA512, + rustls::SignatureScheme::RSA_PSS_SHA256, + rustls::SignatureScheme::RSA_PSS_SHA384, + rustls::SignatureScheme::RSA_PSS_SHA512, + rustls::SignatureScheme::ED25519, + rustls::SignatureScheme::ED448, + ] + } +} + +pub(crate) type HTTPSClient = Client< + HttpsConnector, + http_body_util::combinators::UnsyncBoxBody, +>; + +/// Creates an HTTPS client that can be used to connect to servers that use self-signed certificates +/// It can be used to create tonic gRPC client. +/// REST API, https://docs.rs/reqwest/0.12.9/reqwest/struct.ClientBuilder.html#method.danger_accept_invalid_certs would be the easier option. +pub fn new_https_client() -> Result> +{ + let tls = ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(); + + let mut http = HttpConnector::new(); + http.enforce_http(false); + + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls) + .https_only() + .enable_http2() + .wrap_connector(http); + + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()) + .http2_only(true) + .build(connector); + Ok(client) +} diff --git a/rust/numaflow-pb/Makefile b/rust/numaflow-pb/Makefile index 7d2d8a03c..0f328bd8a 100644 --- a/rust/numaflow-pb/Makefile +++ b/rust/numaflow-pb/Makefile @@ -8,6 +8,9 @@ generate: clean rm -rf src/clients/*.rs rm -rf src/objects/*.rs cp -r ../../pkg/apis/proto proto + mkdir -p proto/google/api + curl -Ls https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/annotations.proto -o proto/google/api/annotations.proto + curl -Ls https://raw.githubusercontent.com/googleapis/googleapis/master/google/api/http.proto -o proto/google/api/http.proto mv src/clients.rs /tmp/clients.rs.bak mv src/objects.rs /tmp/objects.rs.bak > src/clients.rs diff --git a/rust/numaflow-pb/src/clients.rs b/rust/numaflow-pb/src/clients.rs index 31c76191f..93f42dd2d 100644 --- a/rust/numaflow-pb/src/clients.rs +++ b/rust/numaflow-pb/src/clients.rs @@ -29,3 +29,7 @@ pub mod sessionreduce; #[path = "clients/sideinput.v1.rs"] #[rustfmt::skip] pub mod sideinput; + +#[path = "clients/mvtxdaemon.rs"] +#[rustfmt::skip] +pub mod mvtxdaemon; diff --git a/rust/numaflow-pb/src/clients/google.api.rs b/rust/numaflow-pb/src/clients/google.api.rs new file mode 100644 index 000000000..635ce371a --- /dev/null +++ b/rust/numaflow-pb/src/clients/google.api.rs @@ -0,0 +1,360 @@ +// This file is @generated by prost-build. +/// Defines the HTTP configuration for an API service. It contains a list of +/// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method +/// to one or more HTTP REST API methods. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Http { + /// A list of HTTP configuration rules that apply to individual API methods. + /// + /// **NOTE:** All service configuration rules follow "last one wins" order. + #[prost(message, repeated, tag = "1")] + pub rules: ::prost::alloc::vec::Vec, + /// When set to true, URL path parameters will be fully URI-decoded except in + /// cases of single segment matches in reserved expansion, where "%2F" will be + /// left encoded. + /// + /// The default behavior is to not decode RFC 6570 reserved characters in multi + /// segment matches. + #[prost(bool, tag = "2")] + pub fully_decode_reserved_expansion: bool, +} +/// gRPC Transcoding +/// +/// gRPC Transcoding is a feature for mapping between a gRPC method and one or +/// more HTTP REST endpoints. It allows developers to build a single API service +/// that supports both gRPC APIs and REST APIs. Many systems, including [Google +/// APIs](), +/// [Cloud Endpoints](), [gRPC +/// Gateway](), +/// and [Envoy]() proxy support this feature +/// and use it for large scale production services. +/// +/// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies +/// how different portions of the gRPC request message are mapped to the URL +/// path, URL query parameters, and HTTP request body. It also controls how the +/// gRPC response message is mapped to the HTTP response body. `HttpRule` is +/// typically specified as an `google.api.http` annotation on the gRPC method. +/// +/// Each mapping specifies a URL path template and an HTTP method. The path +/// template may refer to one or more fields in the gRPC request message, as long +/// as each field is a non-repeated field with a primitive (non-message) type. +/// The path template controls how fields of the request message are mapped to +/// the URL path. +/// +/// Example: +/// +/// service Messaging { +/// rpc GetMessage(GetMessageRequest) returns (Message) { +/// option (google.api.http) = { +/// get: "/v1/{name=messages/*}" +/// }; +/// } +/// } +/// message GetMessageRequest { +/// string name = 1; // Mapped to URL path. +/// } +/// message Message { +/// string text = 1; // The resource content. +/// } +/// +/// This enables an HTTP REST to gRPC mapping as below: +/// +/// - HTTP: `GET /v1/messages/123456` +/// - gRPC: `GetMessage(name: "messages/123456")` +/// +/// Any fields in the request message which are not bound by the path template +/// automatically become HTTP query parameters if there is no HTTP request body. +/// For example: +/// +/// service Messaging { +/// rpc GetMessage(GetMessageRequest) returns (Message) { +/// option (google.api.http) = { +/// get:"/v1/messages/{message_id}" +/// }; +/// } +/// } +/// message GetMessageRequest { +/// message SubMessage { +/// string subfield = 1; +/// } +/// string message_id = 1; // Mapped to URL path. +/// int64 revision = 2; // Mapped to URL query parameter `revision`. +/// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`. +/// } +/// +/// This enables a HTTP JSON to RPC mapping as below: +/// +/// - HTTP: `GET /v1/messages/123456?revision=2&sub.subfield=foo` +/// - gRPC: `GetMessage(message_id: "123456" revision: 2 sub: +/// SubMessage(subfield: "foo"))` +/// +/// Note that fields which are mapped to URL query parameters must have a +/// primitive type or a repeated primitive type or a non-repeated message type. +/// In the case of a repeated type, the parameter can be repeated in the URL +/// as `...?param=A¶m=B`. In the case of a message type, each field of the +/// message is mapped to a separate parameter, such as +/// `...?foo.a=A&foo.b=B&foo.c=C`. +/// +/// For HTTP methods that allow a request body, the `body` field +/// specifies the mapping. Consider a REST update method on the +/// message resource collection: +/// +/// service Messaging { +/// rpc UpdateMessage(UpdateMessageRequest) returns (Message) { +/// option (google.api.http) = { +/// patch: "/v1/messages/{message_id}" +/// body: "message" +/// }; +/// } +/// } +/// message UpdateMessageRequest { +/// string message_id = 1; // mapped to the URL +/// Message message = 2; // mapped to the body +/// } +/// +/// The following HTTP JSON to RPC mapping is enabled, where the +/// representation of the JSON in the request body is determined by +/// protos JSON encoding: +/// +/// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }` +/// - gRPC: `UpdateMessage(message_id: "123456" message { text: "Hi!" })` +/// +/// The special name `*` can be used in the body mapping to define that +/// every field not bound by the path template should be mapped to the +/// request body. This enables the following alternative definition of +/// the update method: +/// +/// service Messaging { +/// rpc UpdateMessage(Message) returns (Message) { +/// option (google.api.http) = { +/// patch: "/v1/messages/{message_id}" +/// body: "*" +/// }; +/// } +/// } +/// message Message { +/// string message_id = 1; +/// string text = 2; +/// } +/// +/// +/// The following HTTP JSON to RPC mapping is enabled: +/// +/// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }` +/// - gRPC: `UpdateMessage(message_id: "123456" text: "Hi!")` +/// +/// Note that when using `*` in the body mapping, it is not possible to +/// have HTTP parameters, as all fields not bound by the path end in +/// the body. This makes this option more rarely used in practice when +/// defining REST APIs. The common usage of `*` is in custom methods +/// which don't use the URL at all for transferring data. +/// +/// It is possible to define multiple HTTP methods for one RPC by using +/// the `additional_bindings` option. Example: +/// +/// service Messaging { +/// rpc GetMessage(GetMessageRequest) returns (Message) { +/// option (google.api.http) = { +/// get: "/v1/messages/{message_id}" +/// additional_bindings { +/// get: "/v1/users/{user_id}/messages/{message_id}" +/// } +/// }; +/// } +/// } +/// message GetMessageRequest { +/// string message_id = 1; +/// string user_id = 2; +/// } +/// +/// This enables the following two alternative HTTP JSON to RPC mappings: +/// +/// - HTTP: `GET /v1/messages/123456` +/// - gRPC: `GetMessage(message_id: "123456")` +/// +/// - HTTP: `GET /v1/users/me/messages/123456` +/// - gRPC: `GetMessage(user_id: "me" message_id: "123456")` +/// +/// Rules for HTTP mapping +/// +/// 1. Leaf request fields (recursive expansion nested messages in the request +/// message) are classified into three categories: +/// - Fields referred by the path template. They are passed via the URL path. +/// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They +/// are passed via the HTTP +/// request body. +/// - All other fields are passed via the URL query parameters, and the +/// parameter name is the field path in the request message. A repeated +/// field can be represented as multiple query parameters under the same +/// name. +/// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL +/// query parameter, all fields +/// are passed via URL path and HTTP request body. +/// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP +/// request body, all +/// fields are passed via URL path and URL query parameters. +/// +/// Path template syntax +/// +/// Template = "/" Segments \[ Verb \] ; +/// Segments = Segment { "/" Segment } ; +/// Segment = "*" | "**" | LITERAL | Variable ; +/// Variable = "{" FieldPath \[ "=" Segments \] "}" ; +/// FieldPath = IDENT { "." IDENT } ; +/// Verb = ":" LITERAL ; +/// +/// The syntax `*` matches a single URL path segment. The syntax `**` matches +/// zero or more URL path segments, which must be the last part of the URL path +/// except the `Verb`. +/// +/// The syntax `Variable` matches part of the URL path as specified by its +/// template. A variable template must not contain other variables. If a variable +/// matches a single path segment, its template may be omitted, e.g. `{var}` +/// is equivalent to `{var=*}`. +/// +/// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL` +/// contains any reserved character, such characters should be percent-encoded +/// before the matching. +/// +/// If a variable contains exactly one path segment, such as `"{var}"` or +/// `"{var=*}"`, when such a variable is expanded into a URL path on the client +/// side, all characters except `\[-_.~0-9a-zA-Z\]` are percent-encoded. The +/// server side does the reverse decoding. Such variables show up in the +/// [Discovery +/// Document]() as +/// `{var}`. +/// +/// If a variable contains multiple path segments, such as `"{var=foo/*}"` +/// or `"{var=**}"`, when such a variable is expanded into a URL path on the +/// client side, all characters except `\[-_.~/0-9a-zA-Z\]` are percent-encoded. +/// The server side does the reverse decoding, except "%2F" and "%2f" are left +/// unchanged. Such variables show up in the +/// [Discovery +/// Document]() as +/// `{+var}`. +/// +/// Using gRPC API Service Configuration +/// +/// gRPC API Service Configuration (service config) is a configuration language +/// for configuring a gRPC service to become a user-facing product. The +/// service config is simply the YAML representation of the `google.api.Service` +/// proto message. +/// +/// As an alternative to annotating your proto file, you can configure gRPC +/// transcoding in your service config YAML files. You do this by specifying a +/// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same +/// effect as the proto annotation. This can be particularly useful if you +/// have a proto that is reused in multiple services. Note that any transcoding +/// specified in the service config will override any matching transcoding +/// configuration in the proto. +/// +/// The following example selects a gRPC method and applies an `HttpRule` to it: +/// +/// http: +/// rules: +/// - selector: example.v1.Messaging.GetMessage +/// get: /v1/messages/{message_id}/{sub.subfield} +/// +/// Special notes +/// +/// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the +/// proto to JSON conversion must follow the [proto3 +/// specification](). +/// +/// While the single segment variable follows the semantics of +/// [RFC 6570]() Section 3.2.2 Simple String +/// Expansion, the multi segment variable **does not** follow RFC 6570 Section +/// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion +/// does not expand special characters like `?` and `#`, which would lead +/// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding +/// for multi segment variables. +/// +/// The path variables **must not** refer to any repeated or mapped field, +/// because client libraries are not capable of handling such variable expansion. +/// +/// The path variables **must not** capture the leading "/" character. The reason +/// is that the most common use case "{var}" does not capture the leading "/" +/// character. For consistency, all path variables must share the same behavior. +/// +/// Repeated message fields must not be mapped to URL query parameters, because +/// no client library can support such complicated mapping. +/// +/// If an API needs to use a JSON array for request or response body, it can map +/// the request or response body to a repeated field. However, some gRPC +/// Transcoding implementations may not support this feature. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct HttpRule { + /// Selects a method to which this rule applies. + /// + /// Refer to [selector][google.api.DocumentationRule.selector] for syntax + /// details. + #[prost(string, tag = "1")] + pub selector: ::prost::alloc::string::String, + /// The name of the request field whose value is mapped to the HTTP request + /// body, or `*` for mapping all request fields not captured by the path + /// pattern to the HTTP body, or omitted for not having any HTTP request body. + /// + /// NOTE: the referred field must be present at the top-level of the request + /// message type. + #[prost(string, tag = "7")] + pub body: ::prost::alloc::string::String, + /// Optional. The name of the response field whose value is mapped to the HTTP + /// response body. When omitted, the entire response message will be used + /// as the HTTP response body. + /// + /// NOTE: The referred field must be present at the top-level of the response + /// message type. + #[prost(string, tag = "12")] + pub response_body: ::prost::alloc::string::String, + /// Additional HTTP bindings for the selector. Nested bindings must + /// not contain an `additional_bindings` field themselves (that is, + /// the nesting may only be one level deep). + #[prost(message, repeated, tag = "11")] + pub additional_bindings: ::prost::alloc::vec::Vec, + /// Determines the URL pattern is matched by this rules. This pattern can be + /// used with any of the {get|put|post|delete|patch} methods. A custom method + /// can be defined using the 'custom' field. + #[prost(oneof = "http_rule::Pattern", tags = "2, 3, 4, 5, 6, 8")] + pub pattern: ::core::option::Option, +} +/// Nested message and enum types in `HttpRule`. +pub mod http_rule { + /// Determines the URL pattern is matched by this rules. This pattern can be + /// used with any of the {get|put|post|delete|patch} methods. A custom method + /// can be defined using the 'custom' field. + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Pattern { + /// Maps to HTTP GET. Used for listing and getting information about + /// resources. + #[prost(string, tag = "2")] + Get(::prost::alloc::string::String), + /// Maps to HTTP PUT. Used for replacing a resource. + #[prost(string, tag = "3")] + Put(::prost::alloc::string::String), + /// Maps to HTTP POST. Used for creating a resource or performing an action. + #[prost(string, tag = "4")] + Post(::prost::alloc::string::String), + /// Maps to HTTP DELETE. Used for deleting a resource. + #[prost(string, tag = "5")] + Delete(::prost::alloc::string::String), + /// Maps to HTTP PATCH. Used for updating a resource. + #[prost(string, tag = "6")] + Patch(::prost::alloc::string::String), + /// The custom pattern is used for specifying an HTTP method that is not + /// included in the `pattern` field, such as HEAD, or "*" to leave the + /// HTTP method unspecified for this rule. The wild-card rule is useful + /// for services that provide content to Web (HTML) clients. + #[prost(message, tag = "8")] + Custom(super::CustomHttpPattern), + } +} +/// A custom pattern is used for defining custom HTTP verb. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CustomHttpPattern { + /// The name of this custom HTTP verb. + #[prost(string, tag = "1")] + pub kind: ::prost::alloc::string::String, + /// The path matched by this custom verb. + #[prost(string, tag = "2")] + pub path: ::prost::alloc::string::String, +} diff --git a/rust/numaflow-pb/src/clients/mvtxdaemon.rs b/rust/numaflow-pb/src/clients/mvtxdaemon.rs new file mode 100644 index 000000000..38716f3f3 --- /dev/null +++ b/rust/numaflow-pb/src/clients/mvtxdaemon.rs @@ -0,0 +1,224 @@ +// This file is @generated by prost-build. +/// MonoVertexMetrics is used to provide information about the mono vertex including processing rate. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MonoVertexMetrics { + #[prost(string, tag = "1")] + pub mono_vertex: ::prost::alloc::string::String, + /// Processing rate in the past period of time, 1m, 5m, 15m, default + #[prost(map = "string, message", tag = "2")] + pub processing_rates: ::std::collections::HashMap< + ::prost::alloc::string::String, + f64, + >, + /// Pending in the past period of time, 1m, 5m, 15m, default + #[prost(map = "string, message", tag = "3")] + pub pendings: ::std::collections::HashMap<::prost::alloc::string::String, i64>, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetMonoVertexMetricsResponse { + #[prost(message, optional, tag = "1")] + pub metrics: ::core::option::Option, +} +/// MonoVertexStatus is used to provide information about the mono vertex status. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MonoVertexStatus { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub code: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetMonoVertexStatusResponse { + #[prost(message, optional, tag = "1")] + pub status: ::core::option::Option, +} +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct GetMonoVertexLookbackResponse { + #[prost(message, optional, tag = "1")] + pub lookback: ::core::option::Option, +} +/// Generated client implementations. +pub mod mono_vertex_daemon_service_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// MonoVertexDaemonService is a grpc service that is used to provide APIs for giving any MonoVertex information. + #[derive(Debug, Clone)] + pub struct MonoVertexDaemonServiceClient { + inner: tonic::client::Grpc, + } + impl MonoVertexDaemonServiceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl MonoVertexDaemonServiceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> MonoVertexDaemonServiceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + MonoVertexDaemonServiceClient::new( + InterceptedService::new(inner, interceptor), + ) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + pub async fn get_mono_vertex_metrics( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "mvtxdaemon.MonoVertexDaemonService", + "GetMonoVertexMetrics", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_mono_vertex_status( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexStatus", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "mvtxdaemon.MonoVertexDaemonService", + "GetMonoVertexStatus", + ), + ); + self.inner.unary(req, path, codec).await + } + pub async fn get_mono_vertex_lookback( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexLookback", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "mvtxdaemon.MonoVertexDaemonService", + "GetMonoVertexLookback", + ), + ); + self.inner.unary(req, path, codec).await + } + } +} diff --git a/rust/numaflow-pb/src/main.rs b/rust/numaflow-pb/src/main.rs index 80e239089..21c16f7e9 100644 --- a/rust/numaflow-pb/src/main.rs +++ b/rust/numaflow-pb/src/main.rs @@ -4,6 +4,9 @@ fn main() { // protobuf objects for serde build_objects(); + + // gRPC clients for daemon + build_daemon(); } fn build_client() { @@ -36,3 +39,19 @@ fn build_objects() { ) .expect("failed to compile protos"); } + +fn build_daemon() { + tonic_build::configure() + .build_client(true) + .build_server(false) + .out_dir("src/clients") + .compile_protos( + &[ + "proto/mvtxdaemon/mvtxdaemon.proto", + "proto/google/api/annotations.proto", + "proto/google/api/http.proto", + ], + &["proto", "proto/google/api"], + ) + .expect("failed to compile protos"); +}