Skip to content

Commit

Permalink
chore: remove server info dependencies on go sdk (#2060)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored and whynowy committed Sep 26, 2024
1 parent e09a246 commit f9176f1
Show file tree
Hide file tree
Showing 28 changed files with 531 additions and 319 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19203,7 +19203,7 @@
},
"limits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits",
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings"
"description": "Limits define the limitations such as read batch size for the mono vertex."
},
"metadata": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Metadata",
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19198,7 +19198,7 @@
}
},
"limits": {
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings",
"description": "Limits define the limitations such as read batch size for the mono vertex.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits"
},
"metadata": {
Expand Down
8 changes: 4 additions & 4 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5720,8 +5720,8 @@ MonoVertexLimits </a> </em>
<em>(Optional)</em>
<p>

Limits define the limitations such as buffer read batch size for all the
vertices of a pipeline, will override pipeline level settings
Limits define the limitations such as read batch size for the mono
vertex.
</p>

</td>
Expand Down Expand Up @@ -6109,8 +6109,8 @@ MonoVertexLimits </a> </em>
<em>(Optional)</em>
<p>

Limits define the limitations such as buffer read batch size for all the
vertices of a pipeline, will override pipeline level settings
Limits define the limitations such as read batch size for the mono
vertex.
</p>

</td>
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ type MonoVertexSpec struct {
// +patchStrategy=merge
// +patchMergeKey=name
Volumes []corev1.Volume `json:"volumes,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,6,rep,name=volumes"`
// Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings
// Limits define the limitations such as read batch size for the mono vertex.
// +optional
Limits *MonoVertexLimits `json:"limits,omitempty" protobuf:"bytes,7,opt,name=limits"`
// Settings for autoscaling
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/sdkclient/batchmapper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import (
"errors"
"io"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -39,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.BatchMapAddr)

for _, inputOption := range inputOptions {
Expand Down
7 changes: 3 additions & 4 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@ import (
"log"
"strconv"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver"
sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// ConnectToServer connects to the server with the given socket address based on the server info protocol.
func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
func ConnectToServer(udsSockAddr string, serverInfo *serverinfo.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
var err error
var sockAddr string

// Check if Multiproc server mode is enabled
if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok {
if multiProcServer, ok := serverInfo.Metadata[serverinfo.MultiProcKey]; ok {
// Extract the server ports from the server info file
numServers, _ := strconv.Atoi(multiProcServer)
// In Multiprocessing server mode we have multiple servers forks
Expand Down
5 changes: 3 additions & 2 deletions pkg/sdkclient/mapper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -36,7 +37,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.MapAddr)

for _, inputOption := range inputOptions {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sdkclient/mapstreamer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"fmt"
"io"

mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -38,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.MapStreamAddr)

for _, inputOption := range inputOptions {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sdkclient/reducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"errors"
"io"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -38,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.ReduceAddr)

for _, inputOption := range inputOptions {
Expand Down
103 changes: 67 additions & 36 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,23 @@ package serverinfo

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/Masterminds/semver/v3"
pep440 "github.com/aquasecurity/go-pep440-version"

"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow"
)

// Metadata keys used in the server info file
const (
// MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled
// The value contains the number of servers spawned.
MultiProcMetadata = "MULTIPROC"
// MapModeMetadata field is used to indicate which map mode is enabled
// If none is set, we consider unary map as default
MapModeMetadata = "MAP_MODE"
)

type MapMode string

const (
UnaryMap MapMode = "unary-map"
StreamMap MapMode = "stream-map"
BatchMap MapMode = "batch-map"
)
var END = fmt.Sprintf("%U__END__", '\\')

// SDKServerInfo wait for the server to start and return the server info.
func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
func SDKServerInfo(inputOptions ...Option) (*ServerInfo, error) {
var opts = DefaultOptions()

for _, inputOption := range inputOptions {
Expand All @@ -68,33 +52,32 @@ func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
}

// waitForServerInfo waits until the server info is ready. It returns an error if the server info is not ready within the given timeout
func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo, error) {
func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := info.WaitUntilReady(ctx, info.WithServerInfoFilePath(filePath)); err != nil {
if err := waitUntilReady(ctx, WithServerInfoFilePath(filePath)); err != nil {
return nil, fmt.Errorf("failed to wait until server info is ready: %w", err)
}

serverInfo, err := info.Read(info.WithServerInfoFilePath(filePath))
serverInfo, err := read(WithServerInfoFilePath(filePath))
if err != nil {
return nil, fmt.Errorf("failed to read server info: %w", err)
}

sdkVersion := serverInfo.Version
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
sdkLanguage := serverInfo.Language
numaflowVersion := numaflow.GetVersion().Version

// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
// error writing server info on the SDK side
// error writing server info file on the SDK side
if minNumaflowVersion == "" {
log.Printf("warning: failed to get the minimum numaflow version, skipping numaflow version compatibility check")
// If we are testing locally or in CI, we can skip checking for numaflow compatibility issues
// because both return us a version string that the version check libraries can't properly parse (local: "*latest*" CI: commit SHA)
// because both return us a version string that the version-check libraries can't properly parse,
// local: "*latest*", CI: commit SHA
} else if !strings.Contains(numaflowVersion, "latest") && !strings.Contains(numaflowVersion, numaflow.GetVersion().GitCommit) {
if err := checkNumaflowCompatibility(numaflowVersion, minNumaflowVersion); err != nil {
return nil, fmt.Errorf("numaflow %s does not satisfy the minimum required by SDK %s: %w",
return nil, fmt.Errorf("numaflow version %s does not satisfy the minimum required by SDK version %s: %w",
numaflowVersion, sdkVersion, err)
}
}
Expand All @@ -105,14 +88,66 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo
log.Printf("warning: failed to get the SDK version/language, skipping SDK version compatibility check")
} else {
if err := checkSDKCompatibility(sdkVersion, sdkLanguage, minimumSupportedSDKVersions); err != nil {
return nil, fmt.Errorf("SDK %s does not satisfy the minimum required by numaflow %s: %w",
return nil, fmt.Errorf("SDK version %s does not satisfy the minimum required by numaflow version %s: %w",
sdkVersion, numaflowVersion, err)
}
}

return serverInfo, nil
}

// waitUntilReady waits until the server info is ready
func waitUntilReady(ctx context.Context, opts ...Option) error {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if fileInfo, err := os.Stat(options.serverInfoFilePath); err != nil {
log.Printf("Server info file %s is not ready...", options.serverInfoFilePath)
time.Sleep(1 * time.Second)
continue
} else {
if fileInfo.Size() > 0 {
return nil
}
}
}
}
}

// read reads the server info from a file
func read(opts ...Option) (*ServerInfo, error) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
// It takes some time for the server to write the server info file
// TODO: use a better way to wait for the file to be ready
retry := 0
b, err := os.ReadFile(options.serverInfoFilePath)
for !strings.HasSuffix(string(b), END) && err == nil && retry < 10 {
time.Sleep(100 * time.Millisecond)
b, err = os.ReadFile(options.serverInfoFilePath)
retry++
}
if err != nil {
return nil, err
}
if !strings.HasSuffix(string(b), END) {
return nil, fmt.Errorf("server info file is not ready")
}
b = b[:len(b)-len([]byte(END))]
info := &ServerInfo{}
if err := json.Unmarshal(b, info); err != nil {
return nil, fmt.Errorf("failed to unmarshal server info: %w", err)
}
return info, nil
}

func checkConstraint(version *semver.Version, constraint string) error {
if c, err := semver.NewConstraint(constraint); err != nil {
return fmt.Errorf("error parsing constraint: %w, constraint string: %s", err, constraint)
Expand All @@ -128,26 +163,23 @@ func checkNumaflowCompatibility(numaflowVersion string, minNumaflowVersion strin
if minNumaflowVersion == "" {
return fmt.Errorf("server info does not contain minimum numaflow version. Upgrade to newer SDK version")
}

numaflowVersionSemVer, err := semver.NewVersion(numaflowVersion)
if err != nil {
return fmt.Errorf("error parsing numaflow version: %w", err)
}

numaflowConstraint := fmt.Sprintf(">= %s", minNumaflowVersion)
if err = checkConstraint(numaflowVersionSemVer, numaflowConstraint); err != nil {
return fmt.Errorf("numaflow version %s must be upgraded to at least %s, in order to work with current SDK version: %w",
numaflowVersionSemVer.String(), minNumaflowVersion, err)
}

return nil
}

// checkSDKCompatibility checks if the current SDK version is compatible with the numaflow version
func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupportedSDKVersions sdkConstraints) error {
func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupportedSDKVersions sdkConstraints) error {
if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage]; ok {
sdkConstraint := fmt.Sprintf(">= %s", sdkRequiredVersion)
if sdkLanguage == info.Python {
if sdkLanguage == Python {
// Python pre-releases/releases follow PEP440 specification which requires a different library for parsing
sdkVersionPEP440, err := pep440.Parse(sdkVersion)
if err != nil {
Expand Down Expand Up @@ -175,6 +207,5 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupp
}
}
}

return nil
}
Loading

0 comments on commit f9176f1

Please sign in to comment.