From f00685a15983330980447113086f31eede200276 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 13 Sep 2024 22:58:58 -0400 Subject: [PATCH 1/3] chore: remove server info dependencies on go sdk (#2060) Signed-off-by: Keran Yang --- api/json-schema/schema.json | 2 +- api/openapi-spec/swagger.json | 2 +- docs/APIs.md | 8 +- pkg/apis/numaflow/v1alpha1/generated.proto | 2 +- .../numaflow/v1alpha1/mono_vertex_types.go | 2 +- .../numaflow/v1alpha1/openapi_generated.go | 2 +- pkg/sdkclient/batchmapper/client.go | 7 +- pkg/sdkclient/grpc/grpc_utils.go | 7 +- pkg/sdkclient/mapper/client.go | 5 +- pkg/sdkclient/mapstreamer/client.go | 6 +- pkg/sdkclient/reducer/client.go | 6 +- pkg/sdkclient/serverinfo/serverinfo.go | 103 ++++-- pkg/sdkclient/serverinfo/serverinfo_test.go | 136 ++++++- pkg/sdkclient/serverinfo/types.go | 69 ++++ pkg/sdkclient/serverinfo/versions.go | 29 -- pkg/sdkclient/sessionreducer/client.go | 5 +- pkg/sdkclient/sideinput/client.go | 6 +- pkg/sdkclient/sinker/client.go | 6 +- pkg/sdkclient/source/client.go | 6 +- pkg/sdkclient/sourcetransformer/client.go | 5 +- pkg/sideinputs/manager/manager.go | 4 +- pkg/sinks/sink.go | 6 +- pkg/sources/source.go | 6 +- pkg/udf/map_udf.go | 12 +- pkg/udf/reduce_udf.go | 16 +- rust/Cargo.lock | 350 +++++++++--------- rust/monovertex/src/lib.rs | 8 +- rust/monovertex/src/server_info.rs | 34 +- 28 files changed, 531 insertions(+), 319 deletions(-) create mode 100644 pkg/sdkclient/serverinfo/types.go delete mode 100644 pkg/sdkclient/serverinfo/versions.go diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json index bf1f28f594..fefd397d8c 100644 --- a/api/json-schema/schema.json +++ b/api/json-schema/schema.json @@ -19203,7 +19203,7 @@ }, "limits": { "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits", - "description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings" + "description": "Limits define the limitations such as read batch size for the mono vertex." }, "metadata": { "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Metadata", diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 91e6e43fb6..f688e51c8d 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -19198,7 +19198,7 @@ } }, "limits": { - "description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings", + "description": "Limits define the limitations such as read batch size for the mono vertex.", "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits" }, "metadata": { diff --git a/docs/APIs.md b/docs/APIs.md index da6ab6eeb1..d5deaa76b2 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -5720,8 +5720,8 @@ MonoVertexLimits (Optional)

-Limits define the limitations such as buffer read batch size for all the -vertices of a pipeline, will override pipeline level settings +Limits define the limitations such as read batch size for the mono +vertex.

@@ -6109,8 +6109,8 @@ MonoVertexLimits (Optional)

-Limits define the limitations such as buffer read batch size for all the -vertices of a pipeline, will override pipeline level settings +Limits define the limitations such as read batch size for the mono +vertex.

diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto index c927033e7e..70936b9d47 100644 --- a/pkg/apis/numaflow/v1alpha1/generated.proto +++ b/pkg/apis/numaflow/v1alpha1/generated.proto @@ -928,7 +928,7 @@ message MonoVertexSpec { // +patchMergeKey=name repeated k8s.io.api.core.v1.Volume volumes = 6; - // Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings + // Limits define the limitations such as read batch size for the mono vertex. // +optional optional MonoVertexLimits limits = 7; diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 677ec4fc5c..934d497878 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -408,7 +408,7 @@ type MonoVertexSpec struct { // +patchStrategy=merge // +patchMergeKey=name Volumes []corev1.Volume `json:"volumes,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,6,rep,name=volumes"` - // Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings + // Limits define the limitations such as read batch size for the mono vertex. // +optional Limits *MonoVertexLimits `json:"limits,omitempty" protobuf:"bytes,7,opt,name=limits"` // Settings for autoscaling diff --git a/pkg/apis/numaflow/v1alpha1/openapi_generated.go b/pkg/apis/numaflow/v1alpha1/openapi_generated.go index 50a03d54a3..5987162567 100644 --- a/pkg/apis/numaflow/v1alpha1/openapi_generated.go +++ b/pkg/apis/numaflow/v1alpha1/openapi_generated.go @@ -3296,7 +3296,7 @@ func schema_pkg_apis_numaflow_v1alpha1_MonoVertexSpec(ref common.ReferenceCallba }, "limits": { SchemaProps: spec.SchemaProps{ - Description: "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings", + Description: "Limits define the limitations such as read batch size for the mono vertex.", Ref: ref("github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1.MonoVertexLimits"), }, }, diff --git a/pkg/sdkclient/batchmapper/client.go b/pkg/sdkclient/batchmapper/client.go index 7c6db2f608..5cc1718492 100644 --- a/pkg/sdkclient/batchmapper/client.go +++ b/pkg/sdkclient/batchmapper/client.go @@ -21,15 +21,14 @@ import ( "errors" "io" + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" - "github.com/numaproj/numaflow-go/pkg/info" - "github.com/numaproj/numaflow/pkg/sdkclient" sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -39,7 +38,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.BatchMapAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/grpc/grpc_utils.go b/pkg/sdkclient/grpc/grpc_utils.go index 6d3574a290..42fba83e86 100644 --- a/pkg/sdkclient/grpc/grpc_utils.go +++ b/pkg/sdkclient/grpc/grpc_utils.go @@ -21,22 +21,21 @@ import ( "log" "strconv" - "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // ConnectToServer connects to the server with the given socket address based on the server info protocol. -func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) { +func ConnectToServer(udsSockAddr string, serverInfo *serverinfo.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) { var conn *grpc.ClientConn var err error var sockAddr string // Check if Multiproc server mode is enabled - if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok { + if multiProcServer, ok := serverInfo.Metadata[serverinfo.MultiProcKey]; ok { // Extract the server ports from the server info file numServers, _ := strconv.Atoi(multiProcServer) // In Multiprocessing server mode we have multiple servers forks diff --git a/pkg/sdkclient/mapper/client.go b/pkg/sdkclient/mapper/client.go index d22c852906..07ef848a09 100644 --- a/pkg/sdkclient/mapper/client.go +++ b/pkg/sdkclient/mapper/client.go @@ -23,10 +23,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" - "github.com/numaproj/numaflow-go/pkg/info" + "github.com/numaproj/numaflow/pkg/sdkclient" sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -36,7 +37,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.MapAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/mapstreamer/client.go b/pkg/sdkclient/mapstreamer/client.go index 9b512d22ad..ff5d07a7a6 100644 --- a/pkg/sdkclient/mapstreamer/client.go +++ b/pkg/sdkclient/mapstreamer/client.go @@ -21,14 +21,14 @@ import ( "fmt" "io" + mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" - "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow/pkg/sdkclient" sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -38,7 +38,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.MapStreamAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/reducer/client.go b/pkg/sdkclient/reducer/client.go index 64580c831d..6825fdc4c2 100644 --- a/pkg/sdkclient/reducer/client.go +++ b/pkg/sdkclient/reducer/client.go @@ -21,14 +21,14 @@ import ( "errors" "io" + reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" - "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow/pkg/sdkclient" sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -38,7 +38,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.ReduceAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/serverinfo/serverinfo.go b/pkg/sdkclient/serverinfo/serverinfo.go index aa1cdde29d..932ab2ff50 100644 --- a/pkg/sdkclient/serverinfo/serverinfo.go +++ b/pkg/sdkclient/serverinfo/serverinfo.go @@ -18,39 +18,23 @@ package serverinfo import ( "context" + "encoding/json" "fmt" "log" + "os" "strings" "time" "github.com/Masterminds/semver/v3" pep440 "github.com/aquasecurity/go-pep440-version" - "github.com/numaproj/numaflow-go/pkg/info" - "github.com/numaproj/numaflow" ) -// Metadata keys used in the server info file -const ( - // MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled - // The value contains the number of servers spawned. - MultiProcMetadata = "MULTIPROC" - // MapModeMetadata field is used to indicate which map mode is enabled - // If none is set, we consider unary map as default - MapModeMetadata = "MAP_MODE" -) - -type MapMode string - -const ( - UnaryMap MapMode = "unary-map" - StreamMap MapMode = "stream-map" - BatchMap MapMode = "batch-map" -) +var END = fmt.Sprintf("%U__END__", '\\') // SDKServerInfo wait for the server to start and return the server info. -func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) { +func SDKServerInfo(inputOptions ...Option) (*ServerInfo, error) { var opts = DefaultOptions() for _, inputOption := range inputOptions { @@ -68,33 +52,32 @@ func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) { } // waitForServerInfo waits until the server info is ready. It returns an error if the server info is not ready within the given timeout -func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo, error) { +func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - if err := info.WaitUntilReady(ctx, info.WithServerInfoFilePath(filePath)); err != nil { + if err := waitUntilReady(ctx, WithServerInfoFilePath(filePath)); err != nil { return nil, fmt.Errorf("failed to wait until server info is ready: %w", err) } - - serverInfo, err := info.Read(info.WithServerInfoFilePath(filePath)) + serverInfo, err := read(WithServerInfoFilePath(filePath)) if err != nil { return nil, fmt.Errorf("failed to read server info: %w", err) } - sdkVersion := serverInfo.Version minNumaflowVersion := serverInfo.MinimumNumaflowVersion sdkLanguage := serverInfo.Language numaflowVersion := numaflow.GetVersion().Version // If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an - // error writing server info on the SDK side + // error writing server info file on the SDK side if minNumaflowVersion == "" { log.Printf("warning: failed to get the minimum numaflow version, skipping numaflow version compatibility check") // If we are testing locally or in CI, we can skip checking for numaflow compatibility issues - // because both return us a version string that the version check libraries can't properly parse (local: "*latest*" CI: commit SHA) + // because both return us a version string that the version-check libraries can't properly parse, + // local: "*latest*", CI: commit SHA } else if !strings.Contains(numaflowVersion, "latest") && !strings.Contains(numaflowVersion, numaflow.GetVersion().GitCommit) { if err := checkNumaflowCompatibility(numaflowVersion, minNumaflowVersion); err != nil { - return nil, fmt.Errorf("numaflow %s does not satisfy the minimum required by SDK %s: %w", + return nil, fmt.Errorf("numaflow version %s does not satisfy the minimum required by SDK version %s: %w", numaflowVersion, sdkVersion, err) } } @@ -105,14 +88,66 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo log.Printf("warning: failed to get the SDK version/language, skipping SDK version compatibility check") } else { if err := checkSDKCompatibility(sdkVersion, sdkLanguage, minimumSupportedSDKVersions); err != nil { - return nil, fmt.Errorf("SDK %s does not satisfy the minimum required by numaflow %s: %w", + return nil, fmt.Errorf("SDK version %s does not satisfy the minimum required by numaflow version %s: %w", sdkVersion, numaflowVersion, err) } } - return serverInfo, nil } +// waitUntilReady waits until the server info is ready +func waitUntilReady(ctx context.Context, opts ...Option) error { + options := DefaultOptions() + for _, opt := range opts { + opt(options) + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if fileInfo, err := os.Stat(options.serverInfoFilePath); err != nil { + log.Printf("Server info file %s is not ready...", options.serverInfoFilePath) + time.Sleep(1 * time.Second) + continue + } else { + if fileInfo.Size() > 0 { + return nil + } + } + } + } +} + +// read reads the server info from a file +func read(opts ...Option) (*ServerInfo, error) { + options := DefaultOptions() + for _, opt := range opts { + opt(options) + } + // It takes some time for the server to write the server info file + // TODO: use a better way to wait for the file to be ready + retry := 0 + b, err := os.ReadFile(options.serverInfoFilePath) + for !strings.HasSuffix(string(b), END) && err == nil && retry < 10 { + time.Sleep(100 * time.Millisecond) + b, err = os.ReadFile(options.serverInfoFilePath) + retry++ + } + if err != nil { + return nil, err + } + if !strings.HasSuffix(string(b), END) { + return nil, fmt.Errorf("server info file is not ready") + } + b = b[:len(b)-len([]byte(END))] + info := &ServerInfo{} + if err := json.Unmarshal(b, info); err != nil { + return nil, fmt.Errorf("failed to unmarshal server info: %w", err) + } + return info, nil +} + func checkConstraint(version *semver.Version, constraint string) error { if c, err := semver.NewConstraint(constraint); err != nil { return fmt.Errorf("error parsing constraint: %w, constraint string: %s", err, constraint) @@ -128,26 +163,23 @@ func checkNumaflowCompatibility(numaflowVersion string, minNumaflowVersion strin if minNumaflowVersion == "" { return fmt.Errorf("server info does not contain minimum numaflow version. Upgrade to newer SDK version") } - numaflowVersionSemVer, err := semver.NewVersion(numaflowVersion) if err != nil { return fmt.Errorf("error parsing numaflow version: %w", err) } - numaflowConstraint := fmt.Sprintf(">= %s", minNumaflowVersion) if err = checkConstraint(numaflowVersionSemVer, numaflowConstraint); err != nil { return fmt.Errorf("numaflow version %s must be upgraded to at least %s, in order to work with current SDK version: %w", numaflowVersionSemVer.String(), minNumaflowVersion, err) } - return nil } // checkSDKCompatibility checks if the current SDK version is compatible with the numaflow version -func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupportedSDKVersions sdkConstraints) error { +func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupportedSDKVersions sdkConstraints) error { if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage]; ok { sdkConstraint := fmt.Sprintf(">= %s", sdkRequiredVersion) - if sdkLanguage == info.Python { + if sdkLanguage == Python { // Python pre-releases/releases follow PEP440 specification which requires a different library for parsing sdkVersionPEP440, err := pep440.Parse(sdkVersion) if err != nil { @@ -175,6 +207,5 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupp } } } - return nil } diff --git a/pkg/sdkclient/serverinfo/serverinfo_test.go b/pkg/sdkclient/serverinfo/serverinfo_test.go index 90a683a81f..e96919e243 100644 --- a/pkg/sdkclient/serverinfo/serverinfo_test.go +++ b/pkg/sdkclient/serverinfo/serverinfo_test.go @@ -17,20 +17,78 @@ limitations under the License. package serverinfo import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" "testing" - - "github.com/numaproj/numaflow-go/pkg/info" + "time" "github.com/stretchr/testify/assert" ) -var testMinimumSupportedSDKVersions = sdkConstraints{ - info.Go: "0.6.0-0", - info.Python: "0.6.0a", - info.Java: "0.6.0-0", +func Test_SDKServerInfo(t *testing.T) { + filepath := os.TempDir() + "/server-info" + defer os.Remove(filepath) + info := &ServerInfo{ + Protocol: TCP, + Language: Java, + MinimumNumaflowVersion: "1.3.0-rc1", + Version: "v0.8.0", + Metadata: map[string]string{"key1": "value1", "key2": "value2"}, + } + err := write(info, WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + got, err := SDKServerInfo(WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + assert.Equal(t, info, got) +} + +func Test_WaitUntilReady(t *testing.T) { + serverInfoFile, err := os.CreateTemp("/tmp", "server-info") + assert.NoError(t, err) + defer os.Remove(serverInfoFile.Name()) + err = os.WriteFile(serverInfoFile.Name(), []byte("test"), 0644) + assert.NoError(t, err) + + t.Run("test timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + err := waitUntilReady(ctx, WithServerInfoFilePath("/tmp/not-exist")) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + }) + + t.Run("test success", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + err = waitUntilReady(ctx, WithServerInfoFilePath(serverInfoFile.Name())) + assert.NoError(t, err) + }) +} + +func Test_ReadServerInfoFile(t *testing.T) { + filepath := os.TempDir() + "/server-info" + defer os.Remove(filepath) + info := &ServerInfo{ + Protocol: TCP, + Language: Java, + MinimumNumaflowVersion: "1.3.0-rc1", + Version: "v0.8.0", + Metadata: map[string]string{"key1": "value1", "key2": "value2"}, + } + err := write(info, WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + got, err := read(WithServerInfoFilePath("/tmp/not-exist")) + assert.Error(t, err) + assert.True(t, os.IsNotExist(err)) + assert.Nil(t, got) + got, err = read(WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + assert.Equal(t, info, got) } -func TestCheckNumaflowCompatibility(t *testing.T) { +func Test_CheckNumaflowCompatibility(t *testing.T) { tests := []struct { name string numaflowVersion string @@ -59,7 +117,6 @@ func TestCheckNumaflowCompatibility(t *testing.T) { shouldErr: false, }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { err := checkNumaflowCompatibility(tt.numaflowVersion, tt.minNumaflowVersion) @@ -73,11 +130,17 @@ func TestCheckNumaflowCompatibility(t *testing.T) { } } -func TestCheckSDKCompatibility(t *testing.T) { +func Test_CheckSDKCompatibility(t *testing.T) { + var testMinimumSupportedSDKVersions = sdkConstraints{ + Go: "0.6.0-0", + Python: "0.6.0a", + Java: "0.6.0-0", + Rust: "0.1.0", + } tests := []struct { name string sdkVersion string - sdkLanguage info.Language + sdkLanguage Language minimumSupportedSDKVersions sdkConstraints shouldErr bool errMessage string @@ -85,7 +148,7 @@ func TestCheckSDKCompatibility(t *testing.T) { { name: "Test with incompatible Python version", sdkVersion: "v0.5.3a1", - sdkLanguage: info.Python, + sdkLanguage: Python, minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, shouldErr: true, errMessage: "SDK version 0.5.3a1 must be upgraded to at least 0.6.0a, in order to work with current numaflow version", @@ -93,14 +156,14 @@ func TestCheckSDKCompatibility(t *testing.T) { { name: "Test with compatible Python version", sdkVersion: "v0.6.0a2", - sdkLanguage: info.Python, + sdkLanguage: Python, minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, shouldErr: false, }, { name: "Test with incompatible Java version", sdkVersion: "v0.4.3", - sdkLanguage: info.Java, + sdkLanguage: Java, minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, shouldErr: true, errMessage: "SDK version 0.4.3 must be upgraded to at least 0.6.0-0, in order to work with current numaflow version", @@ -108,12 +171,26 @@ func TestCheckSDKCompatibility(t *testing.T) { { name: "Test with compatible Go version", sdkVersion: "v0.6.0-rc2", - sdkLanguage: info.Go, + sdkLanguage: Go, + minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, + shouldErr: false, + }, + { + name: "Test with incompatible Rust version", + sdkVersion: "v0.0.3", + sdkLanguage: Rust, + minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, + shouldErr: true, + errMessage: "SDK version 0.0.3 must be upgraded to at least 0.1.0, in order to work with current numaflow version", + }, + { + name: "Test with compatible Rust version", + sdkVersion: "v0.1.1", + sdkLanguage: Rust, minimumSupportedSDKVersions: testMinimumSupportedSDKVersions, shouldErr: false, }, } - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, tt.minimumSupportedSDKVersions) @@ -126,3 +203,32 @@ func TestCheckSDKCompatibility(t *testing.T) { }) } } + +// write is a test helper function to prepare server info file +func write(svrInfo *ServerInfo, opts ...Option) error { + b, err := json.Marshal(svrInfo) + if err != nil { + return fmt.Errorf("failed to marshal server info: %w", err) + } + options := DefaultOptions() + for _, opt := range opts { + opt(options) + } + if err := os.Remove(options.serverInfoFilePath); !os.IsNotExist(err) && err != nil { + return fmt.Errorf("failed to remove server-info file: %w", err) + } + f, err := os.Create(options.serverInfoFilePath) + if err != nil { + return fmt.Errorf("failed to create server-info file: %w", err) + } + defer f.Close() + _, err = f.Write(b) + if err != nil { + return fmt.Errorf("failed to write server-info file: %w", err) + } + _, err = f.WriteString(END) + if err != nil { + return fmt.Errorf("failed to write END server-info file: %w", err) + } + return nil +} diff --git a/pkg/sdkclient/serverinfo/types.go b/pkg/sdkclient/serverinfo/types.go new file mode 100644 index 0000000000..fc8fdd9b81 --- /dev/null +++ b/pkg/sdkclient/serverinfo/types.go @@ -0,0 +1,69 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serverinfo + +type Language string + +const ( + Go Language = "go" + Python Language = "python" + Java Language = "java" + Rust Language = "rust" +) + +type sdkConstraints map[Language]string + +var minimumSupportedSDKVersions = sdkConstraints{ + Go: "0.8.0", + Python: "0.8.0", + Java: "0.8.0", + Rust: "0.1.0", +} + +type Protocol string + +const ( + UDS Protocol = "uds" + TCP Protocol = "tcp" +) + +type MapMode string + +const ( + UnaryMap MapMode = "unary-map" + StreamMap MapMode = "stream-map" + BatchMap MapMode = "batch-map" +) + +// Metadata keys used in the server info file +const ( + // MultiProcKey is the field used to indicate that MultiProc map mode is enabled + // The value contains the number of servers spawned. + MultiProcKey = "MULTIPROC" + // MapModeKey field is used to indicate which map mode is enabled + // If none is set, we consider the unary map as default + MapModeKey = "MAP_MODE" +) + +// ServerInfo is the information about the server +type ServerInfo struct { + Protocol Protocol `json:"protocol"` + Language Language `json:"language"` + MinimumNumaflowVersion string `json:"minimum_numaflow_version"` + Version string `json:"version"` + Metadata map[string]string `json:"metadata"` +} diff --git a/pkg/sdkclient/serverinfo/versions.go b/pkg/sdkclient/serverinfo/versions.go deleted file mode 100644 index aded1c9bd3..0000000000 --- a/pkg/sdkclient/serverinfo/versions.go +++ /dev/null @@ -1,29 +0,0 @@ -/* -Copyright 2022 The Numaproj Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package serverinfo - -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - -type sdkConstraints map[info.Language]string - -var minimumSupportedSDKVersions = sdkConstraints{ - info.Go: "0.8.0", - info.Python: "0.8.0", - info.Java: "0.8.0", -} diff --git a/pkg/sdkclient/sessionreducer/client.go b/pkg/sdkclient/sessionreducer/client.go index f2ff4d6770..a2c8b12f8e 100644 --- a/pkg/sdkclient/sessionreducer/client.go +++ b/pkg/sdkclient/sessionreducer/client.go @@ -25,10 +25,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" sessionreducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/sessionreduce/v1" - "github.com/numaproj/numaflow-go/pkg/info" + "github.com/numaproj/numaflow/pkg/sdkclient" sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -38,7 +39,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.SessionReduceAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/sideinput/client.go b/pkg/sdkclient/sideinput/client.go index 8b9f9730f2..48c101f260 100644 --- a/pkg/sdkclient/sideinput/client.go +++ b/pkg/sdkclient/sideinput/client.go @@ -21,13 +21,13 @@ import ( "fmt" "time" + sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" - "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow/pkg/sdkclient" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -39,7 +39,7 @@ type client struct { var _ Client = (*client)(nil) // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (*client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (*client, error) { var opts = sdkclient.DefaultOptions(sdkclient.SideInputAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/sinker/client.go b/pkg/sdkclient/sinker/client.go index e8249d1859..67fe08557c 100644 --- a/pkg/sdkclient/sinker/client.go +++ b/pkg/sdkclient/sinker/client.go @@ -20,13 +20,13 @@ import ( "context" "fmt" + sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" - "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow/pkg/sdkclient" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -37,7 +37,7 @@ type client struct { var _ Client = (*client)(nil) -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.SinkAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/source/client.go b/pkg/sdkclient/source/client.go index c5275b3c77..39b96a13ed 100644 --- a/pkg/sdkclient/source/client.go +++ b/pkg/sdkclient/source/client.go @@ -21,13 +21,13 @@ import ( "fmt" "io" + sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" - "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow/pkg/sdkclient" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -38,7 +38,7 @@ type client struct { var _ Client = (*client)(nil) -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.SourceAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sdkclient/sourcetransformer/client.go b/pkg/sdkclient/sourcetransformer/client.go index 1893640a73..d9d47302c0 100644 --- a/pkg/sdkclient/sourcetransformer/client.go +++ b/pkg/sdkclient/sourcetransformer/client.go @@ -23,10 +23,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" transformpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" - "github.com/numaproj/numaflow-go/pkg/info" + "github.com/numaproj/numaflow/pkg/sdkclient" sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error" grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" ) // client contains the grpc connection and the grpc client. @@ -36,7 +37,7 @@ type client struct { } // New creates a new client object. -func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { +func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) { var opts = sdkclient.DefaultOptions(sdkclient.SourceTransformerAddr) for _, inputOption := range inputOptions { diff --git a/pkg/sideinputs/manager/manager.go b/pkg/sideinputs/manager/manager.go index 84de31a2c9..5aea876c72 100644 --- a/pkg/sideinputs/manager/manager.go +++ b/pkg/sideinputs/manager/manager.go @@ -28,7 +28,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isbsvc" "github.com/numaproj/numaflow/pkg/sdkclient" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" "github.com/numaproj/numaflow/pkg/sdkclient/sideinput" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" "github.com/numaproj/numaflow/pkg/shared/kvs" @@ -83,7 +83,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error { } // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SideInputServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.SideInputServerInfoFile)) if err != nil { return err } diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index f18f545127..413d232799 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -32,7 +32,7 @@ import ( "github.com/numaproj/numaflow/pkg/isbsvc" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/sdkclient" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" sinkclient "github.com/numaproj/numaflow/pkg/sdkclient/sinker" "github.com/numaproj/numaflow/pkg/shared/callback" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" @@ -150,7 +150,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, sdkclient.DefaultGRPCMaxMessageSize) if udSink := u.VertexInstance.Vertex.Spec.Sink.UDSink; udSink != nil { // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SinkServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.SinkServerInfoFile)) if err != nil { return err } @@ -179,7 +179,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error { if u.VertexInstance.Vertex.HasFallbackUDSink() { // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.FbSinkServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.FbSinkServerInfoFile)) if err != nil { return err } diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 8a8e64ffd6..e206d76dbc 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -34,7 +34,7 @@ import ( "github.com/numaproj/numaflow/pkg/isbsvc" "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/sdkclient" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" sourceclient "github.com/numaproj/numaflow/pkg/sdkclient/source" "github.com/numaproj/numaflow/pkg/sdkclient/sourcetransformer" "github.com/numaproj/numaflow/pkg/shared/callback" @@ -196,7 +196,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { var udsGRPCClient *udsource.GRPCBasedUDSource if sp.VertexInstance.Vertex.IsUDSource() { // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SourceServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.SourceServerInfoFile)) if err != nil { return err } @@ -235,7 +235,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { if sp.VertexInstance.Vertex.HasUDTransformer() { // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SourceTransformerServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.SourceTransformerServerInfoFile)) if err != nil { return err } diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 44cb1b5aa7..5d926751f0 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -33,7 +33,7 @@ import ( "github.com/numaproj/numaflow/pkg/sdkclient/batchmapper" "github.com/numaproj/numaflow/pkg/sdkclient/mapper" "github.com/numaproj/numaflow/pkg/sdkclient/mapstreamer" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" "github.com/numaproj/numaflow/pkg/shared/callback" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -138,16 +138,16 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { maxMessageSize := sharedutil.LookupEnvIntOr(dfv1.EnvGRPCMaxMessageSize, sdkclient.DefaultGRPCMaxMessageSize) // Wait for map server info to be ready, we use the same info file for all the map modes - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.MapServerInfoFile)) if err != nil { return err } // Read the server info file to read which map mode is enabled // Based on the value set, we will create the corresponding handler and clients - mapMode, ok := serverInfo.Metadata[sdkserverinfo.MapModeMetadata] + mapMode, ok := serverInfo.Metadata[serverinfo.MapModeKey] - if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.StreamMap) { + if ok && (serverinfo.MapMode(mapMode) == serverinfo.StreamMap) { log.Info("Map mode enabled: Stream Map") // Map Stream mode enableMapUdfStream = true @@ -170,9 +170,9 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { }() opts = append(opts, forward.WithUDFStreamingMap(mapStreamHandler)) - } else if ok && (sdkserverinfo.MapMode(mapMode) == sdkserverinfo.BatchMap) { + } else if ok && (serverinfo.MapMode(mapMode) == serverinfo.BatchMap) { log.Info("Map mode enabled: Batch Map") - // if Batch Map mode is enabled create the client and handler for that accordingly + // if Batch Map mode is enabled, create the client and handler for that accordingly enableBatchMapUdf = true // create the client and handler for batch map interface diff --git a/pkg/udf/reduce_udf.go b/pkg/udf/reduce_udf.go index eec3df3ff4..a1c9500f2e 100644 --- a/pkg/udf/reduce_udf.go +++ b/pkg/udf/reduce_udf.go @@ -23,12 +23,8 @@ import ( "strings" "sync" - "github.com/numaproj/numaflow-go/pkg/info" "go.uber.org/zap" - alignedfs "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/aligned/fs" - noopwal "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/noop" - dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/forwarder" "github.com/numaproj/numaflow/pkg/isb" @@ -36,12 +32,14 @@ import ( "github.com/numaproj/numaflow/pkg/reduce" "github.com/numaproj/numaflow/pkg/reduce/applier" "github.com/numaproj/numaflow/pkg/reduce/pbq" + alignedfs "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/aligned/fs" + noopwal "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/noop" "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/unaligned" unalignedfs "github.com/numaproj/numaflow/pkg/reduce/pbq/wal/unaligned/fs" "github.com/numaproj/numaflow/pkg/reduce/pnf" "github.com/numaproj/numaflow/pkg/sdkclient" "github.com/numaproj/numaflow/pkg/sdkclient/reducer" - sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" + "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo" "github.com/numaproj/numaflow/pkg/sdkclient/sessionreducer" jsclient "github.com/numaproj/numaflow/pkg/shared/clients/nats" "github.com/numaproj/numaflow/pkg/shared/logging" @@ -94,19 +92,19 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { // create udf handler and wait until it is ready if windowType.Fixed != nil || windowType.Sliding != nil { - var serverInfo *info.ServerInfo + var serverInfo *serverinfo.ServerInfo var client reducer.Client // if streaming is enabled, use the reduceStreaming address if (windowType.Fixed != nil && windowType.Fixed.Streaming) || (windowType.Sliding != nil && windowType.Sliding.Streaming) { // Wait for server info to be ready - serverInfo, err = sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.ReduceStreamServerInfoFile)) + serverInfo, err = serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.ReduceStreamServerInfoFile)) if err != nil { return err } client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.ReduceStreamAddr)) } else { // Wait for server info to be ready - serverInfo, err = sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.ReduceServerInfoFile)) + serverInfo, err = serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.ReduceServerInfoFile)) if err != nil { return err } @@ -132,7 +130,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error { healthChecker = reduceHandler } else if windowType.Session != nil { // Wait for server info to be ready - serverInfo, err := sdkserverinfo.SDKServerInfo(sdkserverinfo.WithServerInfoFilePath(sdkclient.SessionReduceServerInfoFile)) + serverInfo, err := serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.SessionReduceServerInfoFile)) if err != nil { return err } diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 21d6a28a7d..7748607ca5 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4,18 +4,18 @@ version = 3 [[package]] name = "addr2line" -version = "0.22.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" dependencies = [ "gimli", ] [[package]] -name = "adler" -version = "1.0.2" +name = "adler2" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" [[package]] name = "aho-corasick" @@ -43,9 +43,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "4e1496f8fb1fbf272686b8d37f523dab3e4a7443300055e74cdaa449f3114356" [[package]] name = "arc-swap" @@ -70,9 +70,9 @@ dependencies = [ "rand", "regex", "ring", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pemfile 2.1.3", - "rustls-webpki 0.102.6", + "rustls-webpki 0.102.8", "serde", "serde_json", "serde_nanos", @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.81" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", @@ -133,9 +133,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "aws-lc-rs" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae74d9bd0a7530e8afd1770739ad34b36838829d6ad61818f9230f683f5ad77" +checksum = "2f95446d919226d587817a7d21379e6eb099b97b45110a7f272a444ca5c54070" dependencies = [ "aws-lc-sys", "mirai-annotations", @@ -145,9 +145,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.20.1" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" +checksum = "234314bd569802ec87011d653d6815c6d7b9ffb969e9fee5b8b20ef860e8dce9" dependencies = [ "bindgen", "cc", @@ -240,7 +240,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "pin-project-lite", - "rustls 0.23.12", + "rustls 0.23.13", "rustls-pemfile 2.1.3", "rustls-pki-types", "tokio", @@ -259,17 +259,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.73" +version = "0.3.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" dependencies = [ "addr2line", - "cc", "cfg-if", "libc", "miniz_oxide", "object", "rustc-demangle", + "windows-targets 0.52.6", ] [[package]] @@ -360,12 +360,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.7" +version = "1.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" dependencies = [ "jobserver", "libc", + "shlex", ] [[package]] @@ -411,9 +412,9 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.50" +version = "0.1.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" dependencies = [ "cc", ] @@ -499,15 +500,15 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.12" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" dependencies = [ "libc", ] @@ -667,9 +668,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" [[package]] name = "fiat-crypto" @@ -816,9 +817,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" [[package]] name = "glob" @@ -838,7 +839,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.3.0", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -847,9 +848,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ "atomic-waker", "bytes", @@ -857,7 +858,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap 2.3.0", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -1034,7 +1035,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -1057,10 +1058,10 @@ dependencies = [ "headers", "http 1.1.0", "hyper 1.4.1", - "hyper-rustls 0.27.2", + "hyper-rustls 0.27.3", "hyper-util", "pin-project-lite", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "tokio", "tokio-rustls 0.26.0", "tower-service", @@ -1082,17 +1083,17 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.2" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", "hyper 1.4.1", "hyper-util", "log", - "rustls 0.23.12", - "rustls-native-certs", + "rustls 0.23.13", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -1114,9 +1115,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" dependencies = [ "bytes", "futures-channel", @@ -1177,9 +1178,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.3.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -1187,9 +1188,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" [[package]] name = "itertools" @@ -1226,9 +1227,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29c15563dc2726973df627357ce0c9ddddbea194836909d655df6a75d2cf296d" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" dependencies = [ "wasm-bindgen", ] @@ -1274,9 +1275,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.94.0" +version = "0.94.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65b8611df85a1a2eed6f47bd8bcca4e2b3dc14fbf83658efd01423ca9a13b72a" +checksum = "52ace78a62b361077505f2950bd48aa3e46596fb15350c9c993de15ddfa3cac5" dependencies = [ "k8s-openapi", "kube-client", @@ -1285,9 +1286,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.94.0" +version = "0.94.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93c5ee3e48ef9b8d8fdb40ddd935f8addc8a201397e3c7552edae7bc96bc0a78" +checksum = "18ec0fcafd3add30b413b096a61d69b0a37f94d3f95b6f505a57ea3d27cec2a7" dependencies = [ "base64 0.22.1", "bytes", @@ -1300,14 +1301,14 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "hyper-http-proxy", - "hyper-rustls 0.27.2", + "hyper-rustls 0.27.3", "hyper-timeout", "hyper-util", "jsonpath-rust", "k8s-openapi", "kube-core", "pem", - "rustls 0.23.12", + "rustls 0.23.13", "rustls-pemfile 2.1.3", "secrecy", "serde", @@ -1323,9 +1324,9 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.94.0" +version = "0.94.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe6e24d4cc7e32576f363986dc3dfc13e8e90731bd7a467b67fc6c4bfbf8e95" +checksum = "a50c095f051dada37740d883b6d47ad0430e95082140718073b773c8a70f231c" dependencies = [ "chrono", "form_urlencoded", @@ -1351,9 +1352,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "libloading" @@ -1438,18 +1439,18 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.4" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" dependencies = [ - "adler", + "adler2", ] [[package]] name = "mio" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi", "libc", @@ -1475,7 +1476,7 @@ dependencies = [ "chrono", "hyper-util", "kube", - "numaflow 0.1.0 (git+https://github.com/numaproj/numaflow-rs.git?branch=main)", + "numaflow 0.1.1", "numaflow-models", "once_cell", "parking_lot", @@ -1484,7 +1485,7 @@ dependencies = [ "prost", "prost-types", "rcgen", - "rustls 0.23.12", + "rustls 0.23.13", "semver", "serde", "serde_json", @@ -1601,8 +1602,8 @@ dependencies = [ [[package]] name = "numaflow" -version = "0.1.0" -source = "git+https://github.com/numaproj/numaflow-rs.git?branch=main#f265a615716ab3ec3adf85e8c24413cc076cd695" +version = "0.1.1" +source = "git+https://github.com/numaproj/numaflow-rs.git?branch=main#d3afabd2fff1d070bb3fd79866c0389f009556b3" dependencies = [ "chrono", "futures-util", @@ -1637,9 +1638,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" dependencies = [ "memchr", ] @@ -1754,9 +1755,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" +checksum = "9c73c26c01b8c87956cea613c907c9d6ecffd8d18a2a5908e5de0adfaa185cea" dependencies = [ "memchr", "thiserror", @@ -1765,9 +1766,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" +checksum = "664d22978e2815783adbdd2c588b455b1bd625299ce36b2a99881ac9627e6d8d" dependencies = [ "pest", "pest_generator", @@ -1775,9 +1776,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" +checksum = "a2d5487022d5d33f4c30d91c22afa240ce2a644e87fe08caad974d4eab6badbe" dependencies = [ "pest", "pest_meta", @@ -1788,9 +1789,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.11" +version = "2.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" +checksum = "0091754bbd0ea592c4deb3a122ce8ecbb0753b738aa82bc055fcc2eccc8d8174" dependencies = [ "once_cell", "pest", @@ -1804,7 +1805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", - "indexmap 2.3.0", + "indexmap 2.5.0", ] [[package]] @@ -1872,9 +1873,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.20" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" +checksum = "479cf940fbbb3426c32c5d5176f62ad57549a0bb84773423ba8be9d089f5faba" dependencies = [ "proc-macro2", "syn", @@ -1914,9 +1915,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" +checksum = "3b2ecbe40f08db5c006b5764a2645f7f3f141ce756412ac9e1dd6087e6d32995" dependencies = [ "bytes", "prost-derive", @@ -1924,9 +1925,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" +checksum = "f8650aabb6c35b860610e9cff5dc1af886c9e25073b7b1712a68972af4281302" dependencies = [ "bytes", "heck 0.5.0", @@ -1945,9 +1946,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" +checksum = "acf0c195eebb4af52c752bec4f52f645da98b6e92077a04110c7f349477ae5ac" dependencies = [ "anyhow", "itertools 0.13.0", @@ -1958,18 +1959,18 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.13.1" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" +checksum = "60caa6738c7369b940c3d49246a8d1749323674c65cb13010134f5c9bad5b519" dependencies = [ "prost", ] [[package]] name = "quote" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -2044,9 +2045,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" dependencies = [ "bitflags 2.6.0", ] @@ -2188,18 +2189,18 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" dependencies = [ "bitflags 2.6.0", "errno", @@ -2222,25 +2223,38 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.12" +version = "0.23.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" dependencies = [ "aws-lc-rs", "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.102.6", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.3", + "rustls-pki-types", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", "rustls-pemfile 2.1.3", @@ -2270,9 +2284,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" @@ -2286,9 +2300,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.6" +version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ "aws-lc-rs", "ring", @@ -2310,11 +2324,11 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "schannel" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +checksum = "e9aaafd5a2b6e3d657ff009d82fbd630b6bd54dd4eb06f21693925cdf80f9b8b" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2374,9 +2388,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" dependencies = [ "serde_derive", ] @@ -2393,9 +2407,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.210" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", @@ -2404,9 +2418,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.122" +version = "1.0.128" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" dependencies = [ "itoa", "memchr", @@ -2471,7 +2485,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.3.0", + "indexmap 2.5.0", "itoa", "ryu", "serde", @@ -2482,7 +2496,7 @@ dependencies = [ name = "servesink" version = "0.1.0" dependencies = [ - "numaflow 0.1.0 (git+https://github.com/numaproj/numaflow-rs.git?branch=main)", + "numaflow 0.1.1", "reqwest", "tokio", "tonic", @@ -2644,9 +2658,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.72" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -2688,15 +2702,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fcd239983515c23a32fb82099f97d0b11b8c72f654ed659363a95c3dad7a53" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" dependencies = [ "cfg-if", "fastrand", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2786,9 +2800,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.3" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", @@ -2840,16 +2854,16 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.12", + "rustls 0.23.13", "rustls-pki-types", "tokio", ] [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -2858,9 +2872,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ "bytes", "futures-core", @@ -2896,7 +2910,7 @@ version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap 2.3.0", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", @@ -2905,16 +2919,16 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +checksum = "c6f6ba989e4b2c58ae83d862d3a3e27690b6e3ae630d0deb59f3697f32aa88ad" dependencies = [ "async-stream", "async-trait", "axum", "base64 0.22.1", "bytes", - "h2 0.4.5", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -2935,9 +2949,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" +checksum = "fe4ee8877250136bd7e3d2331632810a4df4ea5e004656990d8d66d2f5ee8a67" dependencies = [ "prettyplease", "proc-macro2", @@ -2994,9 +3008,9 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -3117,9 +3131,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" @@ -3132,9 +3146,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-width" @@ -3210,19 +3224,20 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4be2531df63900aeb2bca0daaaddec08491ee64ceecbee5076636a3b026795a8" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ "cfg-if", + "once_cell", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "614d787b966d3989fa7bb98a654e369c762374fd3213d212cfc0251257e747da" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" dependencies = [ "bumpalo", "log", @@ -3235,9 +3250,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.42" +version = "0.4.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76bc14366121efc8dbb487ab05bcc9d346b3b5ec0eaa76e46594cabbe51762c0" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" dependencies = [ "cfg-if", "js-sys", @@ -3247,9 +3262,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1f8823de937b71b9460c0c34e25f3da88250760bec0ebac694b49997550d726" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3257,9 +3272,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" dependencies = [ "proc-macro2", "quote", @@ -3270,15 +3285,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.92" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" [[package]] name = "web-sys" -version = "0.3.69" +version = "0.3.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77afa9a11836342370f4817622a2f0f418b134426d91a82dfb48f532d2ec13ef" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" dependencies = [ "js-sys", "wasm-bindgen", @@ -3351,6 +3366,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -3535,17 +3559,3 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] diff --git a/rust/monovertex/src/lib.rs b/rust/monovertex/src/lib.rs index c1d172adf9..af69199be2 100644 --- a/rust/monovertex/src/lib.rs +++ b/rust/monovertex/src/lib.rs @@ -134,7 +134,7 @@ pub async fn init( .await .map_err(|e| { warn!("Error waiting for source server info file: {:?}", e); - Error::ForwarderError("Error waiting for server info file".to_string()) + Error::ForwarderError(format!("Error waiting for source server info file: {}", e)) })?; let mut source_client = SourceClient::connect(source_config).await?; @@ -142,7 +142,7 @@ pub async fn init( .await .map_err(|e| { warn!("Error waiting for sink server info file: {:?}", e); - Error::ForwarderError("Error waiting for server info file".to_string()) + Error::ForwarderError(format!("Error waiting for sink server info file: {}", e)) })?; let mut sink_client = SinkClient::connect(sink_config).await?; @@ -152,7 +152,7 @@ pub async fn init( .await .map_err(|e| { warn!("Error waiting for transformer server info file: {:?}", e); - Error::ForwarderError("Error waiting for server info file".to_string()) + Error::ForwarderError(format!("Error waiting for transformer server info file: {}", e)) })?; Some(TransformerClient::connect(config).await?) } else { @@ -164,7 +164,7 @@ pub async fn init( .await .map_err(|e| { warn!("Error waiting for fallback sink server info file: {:?}", e); - Error::ForwarderError("Error waiting for server info file".to_string()) + Error::ForwarderError(format!("Error waiting for fallback sink server info file: {}", e)) })?; Some(SinkClient::connect(config).await?) } else { diff --git a/rust/monovertex/src/server_info.rs b/rust/monovertex/src/server_info.rs index 7412b2ca9d..225218b158 100644 --- a/rust/monovertex/src/server_info.rs +++ b/rust/monovertex/src/server_info.rs @@ -256,11 +256,12 @@ mod version { // MINIMUM_SUPPORTED_SDK_VERSIONS is a HashMap with SDK language as key and minimum supported version as value static MINIMUM_SUPPORTED_SDK_VERSIONS: Lazy = Lazy::new(|| { // TODO: populate this from a static file and make it part of the release process + // the value of the map matches `minimumSupportedSDKVersions` in pkg/sdkclient/serverinfo/types.go let mut m = HashMap::new(); - m.insert("go".to_string(), "0.7.0-rc2".to_string()); - m.insert("python".to_string(), "0.7.0a1".to_string()); - m.insert("java".to_string(), "0.7.2-0".to_string()); - m.insert("rust".to_string(), "0.0.1".to_string()); + m.insert("go".to_string(), "0.8.0".to_string()); + m.insert("python".to_string(), "0.8.0".to_string()); + m.insert("java".to_string(), "0.8.0".to_string()); + m.insert("rust".to_string(), "0.1.0".to_string()); m }); @@ -402,6 +403,7 @@ mod tests { constraints.insert("python".to_string(), "1.2.0".to_string()); constraints.insert("java".to_string(), "2.0.0".to_string()); constraints.insert("go".to_string(), "0.10.0".to_string()); + constraints.insert("rust".to_string(), "0.1.0".to_string()); constraints } @@ -477,6 +479,30 @@ mod tests { assert!(result.is_err()); } + #[tokio::test] + async fn test_sdk_compatibility_rust_valid() { + let sdk_version = "v0.1.0"; + let sdk_language = "rust"; + + let min_supported_sdk_versions = create_sdk_constraints(); + let result = + check_sdk_compatibility(sdk_version, sdk_language, &min_supported_sdk_versions); + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_sdk_compatibility_rust_invalid() { + let sdk_version = "0.0.9"; + let sdk_language = "rust"; + + let min_supported_sdk_versions = create_sdk_constraints(); + let result = + check_sdk_compatibility(sdk_version, sdk_language, &min_supported_sdk_versions); + + assert!(result.is_err()); + } + #[tokio::test] async fn test_numaflow_compatibility_valid() { let numaflow_version = "1.4.0"; From c6003314c8f77905fbd86ddccab12853ca6c63a1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 13 Sep 2024 22:28:28 -0700 Subject: [PATCH 2/3] chore(deps): bump express from 4.19.2 to 4.21.0 in /ui (#2061) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- ui/yarn.lock | 93 +++++++++++++++++++++++++++------------------------- 1 file changed, 49 insertions(+), 44 deletions(-) diff --git a/ui/yarn.lock b/ui/yarn.lock index 36a04c8dd9..d71fb7d1e6 100644 --- a/ui/yarn.lock +++ b/ui/yarn.lock @@ -3857,10 +3857,10 @@ bluebird@^3.7.2: resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f" integrity sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg== -body-parser@1.20.2: - version "1.20.2" - resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.20.2.tgz#6feb0e21c4724d06de7ff38da36dad4f57a747fd" - integrity sha512-ml9pReCu3M61kGlqoTm2umSXTlRTuGTx0bfYj+uIUKKYycG5NtSbeetV3faSU6R7ajOPw0g/J1PvK4qNy7s5bA== +body-parser@1.20.3: + version "1.20.3" + resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.20.3.tgz#1953431221c6fb5cd63c4b36d53fab0928e548c6" + integrity sha512-7rAxByjUMqQ3/bHJy7D6OGXvx/MMc4IqBn/X0fcM1QUcAItpZrBEYhWGem+tzXH90c+G01ypMcYJBO9Y30203g== dependencies: bytes "3.1.2" content-type "~1.0.5" @@ -3870,7 +3870,7 @@ body-parser@1.20.2: http-errors "2.0.0" iconv-lite "0.4.24" on-finished "2.4.1" - qs "6.11.0" + qs "6.13.0" raw-body "2.5.2" type-is "~1.6.18" unpipe "1.0.0" @@ -5176,6 +5176,11 @@ encodeurl@~1.0.2: resolved "https://registry.yarnpkg.com/encodeurl/-/encodeurl-1.0.2.tgz#ad3ff4c86ec2d029322f5a02c3a9a606c95b3f59" integrity sha512-TPJXq8JqFaVYm2CWmPvnP2Iyo4ZSM7/QKcSmuMLDObfpH5fi7RUGmd/rTDf+rut/saiDiQEeVTNgAmJEdAOx0w== +encodeurl@~2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/encodeurl/-/encodeurl-2.0.0.tgz#7b8ea898077d7e409d3ac45474ea38eaf0857a58" + integrity sha512-Q0n9HRi4m6JuGIV1eFlmvJB7ZEVxu93IrMyiMsGC0lrMJMWzRgx6WGquyfQgZVb31vhGgXnfmPNNXmxnOkRBrg== + enhanced-resolve@^5.17.1: version "5.17.1" resolved "https://registry.yarnpkg.com/enhanced-resolve/-/enhanced-resolve-5.17.1.tgz#67bfbbcc2f81d511be77d686a90267ef7f898a15" @@ -5791,36 +5796,36 @@ expect@^27.5.1: jest-message-util "^27.5.1" express@^4.17.3: - version "4.19.2" - resolved "https://registry.yarnpkg.com/express/-/express-4.19.2.tgz#e25437827a3aa7f2a827bc8171bbbb664a356465" - integrity sha512-5T6nhjsT+EOMzuck8JjBHARTHfMht0POzlA60WV2pMD3gyXw2LZnZ+ueGdNxG+0calOJcWKbpFcuzLZ91YWq9Q== + version "4.21.0" + resolved "https://registry.yarnpkg.com/express/-/express-4.21.0.tgz#d57cb706d49623d4ac27833f1cbc466b668eb915" + integrity sha512-VqcNGcj/Id5ZT1LZ/cfihi3ttTn+NJmkli2eZADigjq29qTlWi/hAQ43t/VLPq8+UX06FCEx3ByOYet6ZFblng== dependencies: accepts "~1.3.8" array-flatten "1.1.1" - body-parser "1.20.2" + body-parser "1.20.3" content-disposition "0.5.4" content-type "~1.0.4" cookie "0.6.0" cookie-signature "1.0.6" debug "2.6.9" depd "2.0.0" - encodeurl "~1.0.2" + encodeurl "~2.0.0" escape-html "~1.0.3" etag "~1.8.1" - finalhandler "1.2.0" + finalhandler "1.3.1" fresh "0.5.2" http-errors "2.0.0" - merge-descriptors "1.0.1" + merge-descriptors "1.0.3" methods "~1.1.2" on-finished "2.4.1" parseurl "~1.3.3" - path-to-regexp "0.1.7" + path-to-regexp "0.1.10" proxy-addr "~2.0.7" - qs "6.11.0" + qs "6.13.0" range-parser "~1.2.1" safe-buffer "5.2.1" - send "0.18.0" - serve-static "1.15.0" + send "0.19.0" + serve-static "1.16.2" setprototypeof "1.2.0" statuses "2.0.1" type-is "~1.6.18" @@ -5959,13 +5964,13 @@ fill-range@^7.1.1: dependencies: to-regex-range "^5.0.1" -finalhandler@1.2.0: - version "1.2.0" - resolved "https://registry.yarnpkg.com/finalhandler/-/finalhandler-1.2.0.tgz#7d23fe5731b207b4640e4fcd00aec1f9207a7b32" - integrity sha512-5uXcUVftlQMFnWC9qu/svkWv3GTd2PfUhK/3PLkYNAe7FbqJMt3515HaxE6eRL74GdsriiwujiawdaB1BpEISg== +finalhandler@1.3.1: + version "1.3.1" + resolved "https://registry.yarnpkg.com/finalhandler/-/finalhandler-1.3.1.tgz#0c575f1d1d324ddd1da35ad7ece3df7d19088019" + integrity sha512-6BN9trH7bp3qvnrRyzsBz+g3lZxTNZTbVO2EV1CS0WIcDbawYVdYvGflME/9QP0h0pYlCDBCTjYa9nZzMDpyxQ== dependencies: debug "2.6.9" - encodeurl "~1.0.2" + encodeurl "~2.0.0" escape-html "~1.0.3" on-finished "2.4.1" parseurl "~1.3.3" @@ -8054,10 +8059,10 @@ memoize-one@^4.0.0: resolved "https://registry.yarnpkg.com/memoize-one/-/memoize-one-4.1.0.tgz#a2387c58c03fff27ca390c31b764a79addf3f906" integrity sha512-2GApq0yI/b22J2j9rhbrAlsHb0Qcz+7yWxeLG8h+95sl1XPUgeLimQSOdur4Vw7cUhrBHwaUZxWFZueojqNRzA== -merge-descriptors@1.0.1: - version "1.0.1" - resolved "https://registry.yarnpkg.com/merge-descriptors/-/merge-descriptors-1.0.1.tgz#b00aaa556dd8b44568150ec9d1b953f3f90cbb61" - integrity sha512-cCi6g3/Zr1iqQi6ySbseM1Xvooa98N0w31jzUYrXPX2xqObmFGHJ0tQ5u74H3mVh7wLouTseZyYIq39g8cNp1w== +merge-descriptors@1.0.3: + version "1.0.3" + resolved "https://registry.yarnpkg.com/merge-descriptors/-/merge-descriptors-1.0.3.tgz#d80319a65f3c7935351e5cfdac8f9318504dbed5" + integrity sha512-gaNvAS7TZ897/rVaZ0nMtAyxNyi/pdbjbAwUpFQpN70GqnVfOiXpeUUMKRBmzXaSQ8DdTX4/0ms62r2K+hE6mQ== merge-stream@^2.0.0: version "2.0.0" @@ -8638,10 +8643,10 @@ path-scurry@^1.11.1: lru-cache "^10.2.0" minipass "^5.0.0 || ^6.0.2 || ^7.0.0" -path-to-regexp@0.1.7: - version "0.1.7" - resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.7.tgz#df604178005f522f15eb4490e7247a1bfaa67f8c" - integrity sha512-5DFkuoqlv1uYQKxy8omFBeJPQcdoE07Kv2sferDCrAq1ohOU+MSDswDIbnx3YAM60qIOnYa53wBhXW0EbMonrQ== +path-to-regexp@0.1.10: + version "0.1.10" + resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.10.tgz#67e9108c5c0551b9e5326064387de4763c4d5f8b" + integrity sha512-7lf7qcQidTku0Gu3YDPc8DJ1q7OOucfa/BSsIwjuh56VU7katFvuM8hULfkwB3Fns/rsVF7PwPKVw1sl5KQS9w== path-to-regexp@^1.7.0: version "1.8.0" @@ -9394,12 +9399,12 @@ q@^1.1.2: resolved "https://registry.yarnpkg.com/q/-/q-1.5.1.tgz#7e32f75b41381291d04611f1bf14109ac00651d7" integrity sha512-kV/CThkXo6xyFEZUugw/+pIOywXcDbFYgSct5cT3gqlbkBE1SJdwy6UQoZvodiWF/ckQLZyDE/Bu1M6gVu5lVw== -qs@6.11.0: - version "6.11.0" - resolved "https://registry.yarnpkg.com/qs/-/qs-6.11.0.tgz#fd0d963446f7a65e1367e01abd85429453f0c37a" - integrity sha512-MvjoMCJwEarSbUYk5O+nmoSzSutSsTwF85zcHPQ9OrlFoZOYIjaqBAJIqIXjptyD5vThxGq52Xu/MaJzRkIk4Q== +qs@6.13.0: + version "6.13.0" + resolved "https://registry.yarnpkg.com/qs/-/qs-6.13.0.tgz#6ca3bd58439f7e245655798997787b0d88a51906" + integrity sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg== dependencies: - side-channel "^1.0.4" + side-channel "^1.0.6" querystringify@^2.1.1: version "2.2.0" @@ -10112,10 +10117,10 @@ semver@^7.2.1, semver@^7.3.2, semver@^7.3.5, semver@^7.3.7, semver@^7.5.3, semve resolved "https://registry.yarnpkg.com/semver/-/semver-7.6.3.tgz#980f7b5550bc175fb4dc09403085627f9eb33143" integrity sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A== -send@0.18.0: - version "0.18.0" - resolved "https://registry.yarnpkg.com/send/-/send-0.18.0.tgz#670167cc654b05f5aa4a767f9113bb371bc706be" - integrity sha512-qqWzuOjSFOuqPjFe4NOsMLafToQQwBSOEpS+FwEt3A2V3vKubTquT3vmLTQpFgMXp8AlFWFuP1qKaJZOtPpVXg== +send@0.19.0: + version "0.19.0" + resolved "https://registry.yarnpkg.com/send/-/send-0.19.0.tgz#bbc5a388c8ea6c048967049dbeac0e4a3f09d7f8" + integrity sha512-dW41u5VfLXu8SJh5bwRmyYUbAoSB3c9uQh6L8h/KtsFREPWpbX1lrljJo186Jc4nmci/sGUZ9a0a0J2zgfq2hw== dependencies: debug "2.6.9" depd "2.0.0" @@ -10158,15 +10163,15 @@ serve-index@^1.9.1: mime-types "~2.1.17" parseurl "~1.3.2" -serve-static@1.15.0: - version "1.15.0" - resolved "https://registry.yarnpkg.com/serve-static/-/serve-static-1.15.0.tgz#faaef08cffe0a1a62f60cad0c4e513cff0ac9540" - integrity sha512-XGuRDNjXUijsUL0vl6nSD7cwURuzEgglbOaFuZM9g3kwDXOWVTck0jLzjPzGD+TazWbboZYu52/9/XPdUgne9g== +serve-static@1.16.2: + version "1.16.2" + resolved "https://registry.yarnpkg.com/serve-static/-/serve-static-1.16.2.tgz#b6a5343da47f6bdd2673848bf45754941e803296" + integrity sha512-VqpjJZKadQB/PEbEwvFdO43Ax5dFBZ2UECszz8bQ7pi7wt//PWe1P6MN7eCnjsatYtBT6EuiClbjSWP2WrIoTw== dependencies: - encodeurl "~1.0.2" + encodeurl "~2.0.0" escape-html "~1.0.3" parseurl "~1.3.3" - send "0.18.0" + send "0.19.0" set-cookie-parser@^2.4.6: version "2.6.0" From 910ff9b4ec15e4a6d0bea0b790a9ec97bbe7e119 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Fri, 13 Sep 2024 22:51:04 -0700 Subject: [PATCH 3/3] chore: patch instead of update and bugfix (#2059) Signed-off-by: Derek Wang --- pkg/apis/numaflow/v1alpha1/const.go | 2 - pkg/reconciler/isbsvc/controller.go | 26 +++++------- pkg/reconciler/isbsvc/controller_test.go | 24 ----------- pkg/reconciler/monovertex/controller.go | 2 + pkg/reconciler/monovertex/scaling/scaling.go | 10 +---- pkg/reconciler/pipeline/controller.go | 43 +++++++------------- pkg/reconciler/pipeline/controller_test.go | 15 ------- pkg/reconciler/vertex/controller.go | 2 + pkg/reconciler/vertex/scaling/scaling.go | 10 +---- 9 files changed, 32 insertions(+), 102 deletions(-) diff --git a/pkg/apis/numaflow/v1alpha1/const.go b/pkg/apis/numaflow/v1alpha1/const.go index d0e9eb62f0..e36ec9bd34 100644 --- a/pkg/apis/numaflow/v1alpha1/const.go +++ b/pkg/apis/numaflow/v1alpha1/const.go @@ -41,8 +41,6 @@ const ( KeyPauseTimestamp = "numaflow.numaproj.io/pause-timestamp" KeyDefaultContainer = "kubectl.kubernetes.io/default-container" - RemovePauseTimestampPatch = `[{"op": "remove", "path": "/metadata/annotations/numaflow.numaproj.io~1pause-timestamp"}]` - // ID key in the header of sources like http KeyMetaID = "X-Numaflow-Id" KeyMetaEventTime = "X-Numaflow-Event-Time" diff --git a/pkg/reconciler/isbsvc/controller.go b/pkg/reconciler/isbsvc/controller.go index 1ab7d4e79b..d94e14424d 100644 --- a/pkg/reconciler/isbsvc/controller.go +++ b/pkg/reconciler/isbsvc/controller.go @@ -18,17 +18,20 @@ package isbsvc import ( "context" + "strings" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/yaml" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" @@ -58,7 +61,7 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct isbSvc := &dfv1.InterStepBufferService{} if err := r.client.Get(ctx, req.NamespacedName, isbSvc); err != nil { if apierrors.IsNotFound(err) { - return reconcile.Result{}, nil + return ctrl.Result{}, nil } r.logger.Errorw("Unable to get ISB Service", zap.Any("request", req), zap.Error(err)) return ctrl.Result{}, err @@ -69,14 +72,15 @@ func (r *interStepBufferServiceReconciler) Reconcile(ctx context.Context, req ct if reconcileErr != nil { log.Errorw("Reconcile error", zap.Error(reconcileErr)) } - if r.needsUpdate(isbSvc, isbSvcCopy) { - // Update with a DeepCopy because .Status will be cleaned up. - if err := r.client.Update(ctx, isbSvcCopy.DeepCopy()); err != nil { - return reconcile.Result{}, err + if !equality.Semantic.DeepEqual(isbSvc.Finalizers, isbSvcCopy.Finalizers) { + patchYaml := "metadata:\n finalizers: [" + strings.Join(isbSvcCopy.Finalizers, ",") + "]" + patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml)) + if err := r.client.Patch(ctx, isbSvc, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil { + return ctrl.Result{}, err } } if err := r.client.Status().Update(ctx, isbSvcCopy); err != nil { - return reconcile.Result{}, err + return ctrl.Result{}, err } return ctrl.Result{}, reconcileErr } @@ -122,16 +126,6 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc return installer.Install(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder) } -func (r *interStepBufferServiceReconciler) needsUpdate(old, new *dfv1.InterStepBufferService) bool { - if old == nil { - return true - } - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { - return true - } - return false -} - func needsFinalizer(isbSvc *dfv1.InterStepBufferService) bool { if isbSvc.Spec.Redis != nil && isbSvc.Spec.Redis.Native != nil && isbSvc.Spec.Redis.Native.Persistence != nil { return true diff --git a/pkg/reconciler/isbsvc/controller_test.go b/pkg/reconciler/isbsvc/controller_test.go index 82aee9d90f..2a24ec7c69 100644 --- a/pkg/reconciler/isbsvc/controller_test.go +++ b/pkg/reconciler/isbsvc/controller_test.go @@ -202,42 +202,18 @@ func TestReconcileJetStream(t *testing.T) { func TestNeedsUpdate(t *testing.T) { t.Run("needs redis update", func(t *testing.T) { testIsbs := nativeRedisIsbs.DeepCopy() - cl := fake.NewClientBuilder().Build() - r := &interStepBufferServiceReconciler{ - client: cl, - scheme: scheme.Scheme, - config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig), - logger: zaptest.NewLogger(t).Sugar(), - } - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.AddFinalizer(testIsbs, finalizerName) assert.True(t, contains(testIsbs.Finalizers, finalizerName)) - assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.RemoveFinalizer(testIsbs, finalizerName) assert.False(t, contains(testIsbs.Finalizers, finalizerName)) - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) - testIsbs.Status.MarkConfigured() - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) }) t.Run("needs jetstream update", func(t *testing.T) { testIsbs := jetStreamIsbs.DeepCopy() - cl := fake.NewClientBuilder().Build() - r := &interStepBufferServiceReconciler{ - client: cl, - scheme: scheme.Scheme, - config: reconciler.FakeGlobalConfig(t, fakeGlobalISBSvcConfig), - logger: zaptest.NewLogger(t).Sugar(), - } - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.AddFinalizer(testIsbs, finalizerName) assert.True(t, contains(testIsbs.Finalizers, finalizerName)) - assert.True(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) controllerutil.RemoveFinalizer(testIsbs, finalizerName) assert.False(t, contains(testIsbs.Finalizers, finalizerName)) - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) - testIsbs.Status.MarkConfigured() - assert.False(t, r.needsUpdate(nativeRedisIsbs, testIsbs)) }) } diff --git a/pkg/reconciler/monovertex/controller.go b/pkg/reconciler/monovertex/controller.go index 9aca247a05..3fbfb3c1ab 100644 --- a/pkg/reconciler/monovertex/controller.go +++ b/pkg/reconciler/monovertex/controller.go @@ -236,6 +236,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash } else { // Update scenario if updatedReplicas >= desiredReplicas { + monoVtx.Status.UpdatedReplicas = uint32(desiredReplicas) + monoVtx.Status.CurrentHash = monoVtx.Status.UpdateHash return nil } diff --git a/pkg/reconciler/monovertex/scaling/scaling.go b/pkg/reconciler/monovertex/scaling/scaling.go index 481408672c..0b35265190 100644 --- a/pkg/reconciler/monovertex/scaling/scaling.go +++ b/pkg/reconciler/monovertex/scaling/scaling.go @@ -19,7 +19,6 @@ package scaling import ( "container/list" "context" - "encoding/json" "fmt" "math" "strings" @@ -30,7 +29,6 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -359,12 +357,8 @@ func (s *Scaler) Start(ctx context.Context) error { func (s *Scaler) patchMonoVertexReplicas(ctx context.Context, monoVtx *dfv1.MonoVertex, desiredReplicas int32) error { log := logging.FromContext(ctx) origin := monoVtx.Spec.Replicas - monoVtx.Spec.Replicas = ptr.To[int32](desiredReplicas) - body, err := json.Marshal(monoVtx) - if err != nil { - return fmt.Errorf("failed to marshal MonoVertex object to json, %w", err) - } - if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) { + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas) + if err := s.client.Patch(ctx, monoVtx, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to patch MonoVertex replicas, %w", err) } log.Infow("Auto scaling - mono vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", monoVtx.Namespace), zap.String("vertex", monoVtx.Name)) diff --git a/pkg/reconciler/pipeline/controller.go b/pkg/reconciler/pipeline/controller.go index d8b989f2d6..b2f99e7b1d 100644 --- a/pkg/reconciler/pipeline/controller.go +++ b/pkg/reconciler/pipeline/controller.go @@ -18,7 +18,6 @@ package pipeline import ( "context" - "encoding/json" "fmt" "strings" "time" @@ -40,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/yaml" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" daemonclient "github.com/numaproj/numaflow/pkg/daemon/client" @@ -51,6 +51,8 @@ import ( const ( finalizerName = dfv1.ControllerPipeline + + pauseTimestampPath = `/metadata/annotations/numaflow.numaproj.io~1pause-timestamp` ) // pipelineReconciler reconciles a pipeline object. @@ -85,9 +87,10 @@ func (r *pipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Errorw("Reconcile error", zap.Error(reconcileErr)) } plCopy.Status.LastUpdated = metav1.Now() - if needsUpdate(pl, plCopy) { - // Update with a DeepCopy because .Status will be cleaned up. - if err := r.client.Update(ctx, plCopy.DeepCopy()); err != nil { + if !equality.Semantic.DeepEqual(pl.Finalizers, plCopy.Finalizers) { + patchYaml := "metadata:\n finalizers: [" + strings.Join(plCopy.Finalizers, ",") + "]" + patchJson, _ := yaml.YAMLToJSON([]byte(patchYaml)) + if err := r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil { return result, err } } @@ -292,7 +295,9 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df r.recorder.Eventf(pl, corev1.EventTypeNormal, "CreateVertexSuccess", "Created vertex %s successfully", vertexName) } else { if oldObj.GetAnnotations()[dfv1.KeyHash] != newObj.GetAnnotations()[dfv1.KeyHash] { // need to update + originalReplicas := oldObj.Spec.Replicas oldObj.Spec = newObj.Spec + oldObj.Spec.Replicas = originalReplicas oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash] if err := r.client.Update(ctx, &oldObj); err != nil { r.recorder.Eventf(pl, corev1.EventTypeWarning, "UpdateVertexFailed", "Failed to update vertex: %w", err.Error()) @@ -588,17 +593,6 @@ func (r *pipelineReconciler) cleanUpBuffers(ctx context.Context, pl *dfv1.Pipeli return nil } -func needsUpdate(old, new *dfv1.Pipeline) bool { - if old == nil { - return true - } - if !equality.Semantic.DeepEqual(old.Finalizers, new.Finalizers) { - return true - } - - return false -} - func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex { result := make(map[string]dfv1.Vertex) for _, v := range pl.Spec.Vertices { @@ -814,7 +808,7 @@ func (r *pipelineReconciler) updateDesiredState(ctx context.Context, pl *dfv1.Pi func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { // reset pause timestamp if pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" { - err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(dfv1.RemovePauseTimestampPatch))) + err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(`[{"op": "remove", "path": "`+pauseTimestampPath+`"}]`))) if err != nil { if apierrors.IsNotFound(err) { return false, nil // skip pipeline if it can't be found @@ -837,13 +831,8 @@ func (r *pipelineReconciler) resumePipeline(ctx context.Context, pl *dfv1.Pipeli func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipeline) (bool, error) { // check that annotations / pause timestamp annotation exist if pl.GetAnnotations() == nil || pl.GetAnnotations()[dfv1.KeyPauseTimestamp] == "" { - pl.SetAnnotations(map[string]string{dfv1.KeyPauseTimestamp: time.Now().Format(time.RFC3339)}) - body, err := json.Marshal(pl) - if err != nil { - return false, err - } - err = r.client.Patch(ctx, pl, client.RawPatch(types.MergePatchType, body)) - if err != nil && !apierrors.IsNotFound(err) { + patchJson := `[{"op": "add", "path": "` + pauseTimestampPath + `", "value": "` + time.Now().Format(time.RFC3339) + `"}]` + if err := r.client.Patch(ctx, pl, client.RawPatch(types.JSONPatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return true, err } } @@ -924,12 +913,8 @@ func (r *pipelineReconciler) scaleVertex(ctx context.Context, pl *dfv1.Pipeline, } } } - vertex.Spec.Replicas = ptr.To[int32](scaleTo) - body, err := json.Marshal(vertex) - if err != nil { - return false, err - } - err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, body)) + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, scaleTo) + err = r.client.Patch(ctx, &vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))) if err != nil && !apierrors.IsNotFound(err) { return false, err } diff --git a/pkg/reconciler/pipeline/controller_test.go b/pkg/reconciler/pipeline/controller_test.go index e130f49656..aafff27cc3 100644 --- a/pkg/reconciler/pipeline/controller_test.go +++ b/pkg/reconciler/pipeline/controller_test.go @@ -37,7 +37,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/reconciler" @@ -352,8 +351,6 @@ func Test_pauseAndResumePipeline(t *testing.T) { v, err := r.findExistingVertices(ctx, testObj) assert.NoError(t, err) assert.Equal(t, int32(0), *v[testObj.Name+"-"+testObj.Spec.Vertices[0].Name].Spec.Replicas) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) v, err = r.findExistingVertices(ctx, testObj) @@ -380,8 +377,6 @@ func Test_pauseAndResumePipeline(t *testing.T) { assert.NoError(t, err) _, err = r.findExistingVertices(ctx, testObj) assert.NoError(t, err) - assert.NotNil(t, testObj.Annotations[dfv1.KeyPauseTimestamp]) - testObj.Annotations[dfv1.KeyPauseTimestamp] = "" _, err = r.resumePipeline(ctx, testObj) assert.NoError(t, err) v, err := r.findExistingVertices(ctx, testObj) @@ -560,16 +555,6 @@ func Test_buildISBBatchJob(t *testing.T) { }) } -func Test_needsUpdate(t *testing.T) { - testObj := testPipeline.DeepCopy() - assert.True(t, needsUpdate(nil, testObj)) - assert.False(t, needsUpdate(testPipeline, testObj)) - controllerutil.AddFinalizer(testObj, finalizerName) - assert.True(t, needsUpdate(testPipeline, testObj)) - testobj1 := testObj.DeepCopy() - assert.False(t, needsUpdate(testObj, testobj1)) -} - func Test_cleanupBuffers(t *testing.T) { cl := fake.NewClientBuilder().Build() ctx := context.TODO() diff --git a/pkg/reconciler/vertex/controller.go b/pkg/reconciler/vertex/controller.go index 8789b5d89a..20945639ab 100644 --- a/pkg/reconciler/vertex/controller.go +++ b/pkg/reconciler/vertex/controller.go @@ -259,6 +259,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver vertex.Status.CurrentHash = vertex.Status.UpdateHash } else { // Update scenario if updatedReplicas >= desiredReplicas { + vertex.Status.UpdatedReplicas = uint32(desiredReplicas) + vertex.Status.CurrentHash = vertex.Status.UpdateHash return nil } diff --git a/pkg/reconciler/vertex/scaling/scaling.go b/pkg/reconciler/vertex/scaling/scaling.go index eed5981e89..139e189f5f 100644 --- a/pkg/reconciler/vertex/scaling/scaling.go +++ b/pkg/reconciler/vertex/scaling/scaling.go @@ -19,7 +19,6 @@ package scaling import ( "container/list" "context" - "encoding/json" "fmt" "math" "strings" @@ -30,7 +29,6 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" @@ -499,12 +497,8 @@ loop: func (s *Scaler) patchVertexReplicas(ctx context.Context, vertex *dfv1.Vertex, desiredReplicas int32) error { log := logging.FromContext(ctx) origin := vertex.Spec.Replicas - vertex.Spec.Replicas = ptr.To[int32](desiredReplicas) - body, err := json.Marshal(vertex) - if err != nil { - return fmt.Errorf("failed to marshal vertex object to json, %w", err) - } - if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, body)); err != nil && !apierrors.IsNotFound(err) { + patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desiredReplicas) + if err := s.client.Patch(ctx, vertex, client.RawPatch(types.MergePatchType, []byte(patchJson))); err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to patch vertex replicas, %w", err) } log.Infow("Auto scaling - vertex replicas changed.", zap.Int32p("from", origin), zap.Int32("to", desiredReplicas), zap.String("namespace", vertex.Namespace), zap.String("pipeline", vertex.Spec.PipelineName), zap.String("vertex", vertex.Spec.Name))