Skip to content

Commit

Permalink
feat: add sdk infomation metrics (#2208)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Nov 18, 2024
1 parent 8f7132d commit d507e19
Show file tree
Hide file tree
Showing 17 changed files with 375 additions and 282 deletions.
2 changes: 1 addition & 1 deletion examples/21-simple-mono-vertex.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ spec:
sink:
udsink:
container:
image: quay.io/numaio/numaflow-rs/sink-log:stable
image: quay.io/numaio/numaflow-rs/sink-log:stable
13 changes: 13 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,22 @@ const (
LabelPartitionName = "partition_name"
LabelMonoVertexName = "mvtx_name"

LabelComponent = "component"
LabelComponentName = "component_name"
LabelSDKLanguage = "language"
LabelSDKVersion = "version"
LabelSDKType = "type" // container type, e.g sourcer, sourcetransformer, sinker, etc. see serverinfo.ContainerType

LabelReason = "reason"
)

var (
SDKInfo = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "sdk_info",
Help: "A metric with a constant value '1', labeled by SDK information such as version, language, and type",
}, []string{LabelComponent, LabelComponentName, LabelSDKType, LabelSDKVersion, LabelSDKLanguage})
)

// Generic forwarder metrics
var (
// ReadMessagesCount is used to indicate the number of total messages read
Expand Down
20 changes: 12 additions & 8 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
sdkLanguage := serverInfo.Language
numaflowVersion := numaflow.GetVersion().Version
containerType, err := getContainerType(filePath)
if err != nil {
return nil, fmt.Errorf("failed to get container type: %w", err)
containerType := getContainerType(filePath)
if containerType == ContainerTypeUnknown {
return nil, fmt.Errorf("unknown container type")
}

// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
Expand Down Expand Up @@ -221,11 +221,15 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, containerTyp

// getContainerType returns the container type from the server info file path
// serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info"
func getContainerType(serverInfoFilePath string) (ContainerType, error) {
func getContainerType(serverInfoFilePath string) ContainerType {
splits := strings.Split(serverInfoFilePath, "/")
if containerType := strings.TrimSuffix(splits[len(splits)-1], "-server-info"); containerType == "" {
return "", fmt.Errorf("failed to get container type from server info file path: %s", serverInfoFilePath)
} else {
return ContainerType(containerType), nil
containerType := ContainerType(strings.TrimSuffix(splits[len(splits)-1], "-server-info"))
switch containerType {
case ContainerTypeSourcer, ContainerTypeSourcetransformer, ContainerTypeSinker, ContainerTypeMapper,
ContainerTypeReducer, ContainerTypeReducestreamer, ContainerTypeSessionreducer,
ContainerTypeSideinput, ContainerTypeFbsinker:
return containerType
default:
return ContainerTypeUnknown
}
}
20 changes: 10 additions & 10 deletions pkg/sdkclient/serverinfo/serverinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ func Test_CheckNumaflowCompatibility(t *testing.T) {
func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
sourcer: "0.6.0rc100",
ContainerTypeSourcer: "0.6.0rc100",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-z",
ContainerTypeSourcer: "0.6.0-z",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-z",
ContainerTypeSourcer: "0.6.0-z",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-z",
ContainerTypeSourcer: "0.1.0-z",
},
}
tests := []struct {
Expand Down Expand Up @@ -283,7 +283,7 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand All @@ -298,16 +298,16 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
sourcer: "0.6.0b1",
ContainerTypeSourcer: "0.6.0b1",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-rc2",
ContainerTypeSourcer: "0.6.0-rc2",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-rc2",
ContainerTypeSourcer: "0.6.0-rc2",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-rc3",
ContainerTypeSourcer: "0.1.0-rc3",
},
}
tests := []struct {
Expand Down Expand Up @@ -395,7 +395,7 @@ func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, ContainerTypeSourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand Down
97 changes: 49 additions & 48 deletions pkg/sdkclient/serverinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ type ContainerType string
// the string content matches the corresponding server info file name.
// DO NOT change it unless the server info file name is changed.
const (
sourcer ContainerType = "sourcer"
sourcetransformer ContainerType = "sourcetransformer"
sinker ContainerType = "sinker"
mapper ContainerType = "mapper"
reducer ContainerType = "reducer"
reducestreamer ContainerType = "reducestreamer"
sessionreducer ContainerType = "sessionreducer"
sideinput ContainerType = "sideinput"
fbsinker ContainerType = "fb-sinker"
ContainerTypeSourcer ContainerType = "sourcer"
ContainerTypeSourcetransformer ContainerType = "sourcetransformer"
ContainerTypeSinker ContainerType = "sinker"
ContainerTypeMapper ContainerType = "mapper"
ContainerTypeReducer ContainerType = "reducer"
ContainerTypeReducestreamer ContainerType = "reducestreamer"
ContainerTypeSessionreducer ContainerType = "sessionreducer"
ContainerTypeSideinput ContainerType = "sideinput"
ContainerTypeFbsinker ContainerType = "fb-sinker"
ContainerTypeUnknown ContainerType = "unknown"
)

type sdkConstraints map[Language]map[ContainerType]string
Expand Down Expand Up @@ -87,51 +88,51 @@ More details about version comparison can be found in the PEP 440 and semver doc
var minimumSupportedSDKVersions = sdkConstraints{
Python: map[ContainerType]string{
// meaning the minimum supported python SDK version is 0.9.0
sourcer: "0.9.0rc100",
sourcetransformer: "0.9.0rc100",
sinker: "0.9.0rc100",
mapper: "0.9.0rc100",
reducer: "0.9.0rc100",
reducestreamer: "0.9.0rc100",
sessionreducer: "0.9.0rc100",
sideinput: "0.9.0rc100",
fbsinker: "0.9.0rc100",
ContainerTypeSourcer: "0.9.0rc100",
ContainerTypeSourcetransformer: "0.9.0rc100",
ContainerTypeSinker: "0.9.0rc100",
ContainerTypeMapper: "0.9.0rc100",
ContainerTypeReducer: "0.9.0rc100",
ContainerTypeReducestreamer: "0.9.0rc100",
ContainerTypeSessionreducer: "0.9.0rc100",
ContainerTypeSideinput: "0.9.0rc100",
ContainerTypeFbsinker: "0.9.0rc100",
},
Go: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.8.0
sourcer: "0.9.0-z",
sourcetransformer: "0.9.0-z",
sinker: "0.9.0-z",
mapper: "0.9.0-z",
reducer: "0.9.0-z",
reducestreamer: "0.9.0-z",
sessionreducer: "0.9.0-z",
sideinput: "0.9.0-z",
fbsinker: "0.9.0-z",
// meaning the minimum supported go SDK version is 0.9.0
ContainerTypeSourcer: "0.9.0-z",
ContainerTypeSourcetransformer: "0.9.0-z",
ContainerTypeSinker: "0.9.0-z",
ContainerTypeMapper: "0.9.0-z",
ContainerTypeReducer: "0.9.0-z",
ContainerTypeReducestreamer: "0.9.0-z",
ContainerTypeSessionreducer: "0.9.0-z",
ContainerTypeSideinput: "0.9.0-z",
ContainerTypeFbsinker: "0.9.0-z",
},
Java: map[ContainerType]string{
// meaning the minimum supported java SDK version is 0.8.0
sourcer: "0.9.0-z",
sourcetransformer: "0.9.0-z",
sinker: "0.9.0-z",
mapper: "0.9.0-z",
reducer: "0.9.0-z",
reducestreamer: "0.9.0-z",
sessionreducer: "0.9.0-z",
sideinput: "0.9.0-z",
fbsinker: "0.9.0-z",
// meaning the minimum supported go SDK version is 0.9.0
ContainerTypeSourcer: "0.9.0-z",
ContainerTypeSourcetransformer: "0.9.0-z",
ContainerTypeSinker: "0.9.0-z",
ContainerTypeMapper: "0.9.0-z",
ContainerTypeReducer: "0.9.0-z",
ContainerTypeReducestreamer: "0.9.0-z",
ContainerTypeSessionreducer: "0.9.0-z",
ContainerTypeSideinput: "0.9.0-z",
ContainerTypeFbsinker: "0.9.0-z",
},
Rust: map[ContainerType]string{
// meaning the minimum supported rust SDK version is 0.2.0
sourcer: "0.1.0-z",
sourcetransformer: "0.1.0-z",
sinker: "0.1.0-z",
mapper: "0.1.0-z",
reducer: "0.1.0-z",
reducestreamer: "0.1.0-z",
sessionreducer: "0.1.0-z",
sideinput: "0.1.0-z",
fbsinker: "0.1.0-z",
// meaning the minimum supported go SDK version is 0.1.0
ContainerTypeSourcer: "0.1.0-z",
ContainerTypeSourcetransformer: "0.1.0-z",
ContainerTypeSinker: "0.1.0-z",
ContainerTypeMapper: "0.1.0-z",
ContainerTypeReducer: "0.1.0-z",
ContainerTypeReducestreamer: "0.1.0-z",
ContainerTypeSessionreducer: "0.1.0-z",
ContainerTypeSideinput: "0.1.0-z",
ContainerTypeFbsinker: "0.1.0-z",
},
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sideinputs/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isbsvc"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/sdkclient"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
"github.com/numaproj/numaflow/pkg/sdkclient/sideinput"
Expand Down Expand Up @@ -87,6 +88,7 @@ func (sim *sideInputsManager) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentSideInputManager, fmt.Sprintf("%s-%s", sim.pipelineName, sim.sideInput.Name), string(serverinfo.ContainerTypeSideinput), serverInfo.Version, string(serverInfo.Language)).Set(1)

// Create a new gRPC client for Side Input
sideInputClient, err := sideinput.New(serverInfo)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSinker), serverInfo.Version, string(serverInfo.Language)).Set(1)

sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down Expand Up @@ -183,6 +184,7 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeFbsinker), serverInfo.Version, string(serverInfo.Language)).Set(1)

sdkClient, err := sinkclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.FbSinkAddr))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSourcer), serverInfo.Version, string(serverInfo.Language)).Set(1)

srcClient, err := sourceclient.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down Expand Up @@ -238,6 +239,7 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSourcetransformer), serverInfo.Version, string(serverInfo.Language)).Set(1)

srcTransformerClient, err := sourcetransformer.New(ctx, serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/udf/map_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeMapper), serverInfo.Version, string(serverInfo.Language)).Set(1)

// track all the resources that need to be closed
var resourcesToClose []io.Closer
Expand Down
3 changes: 3 additions & 0 deletions pkg/udf/reduce_udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeReducestreamer), serverInfo.Version, string(serverInfo.Language)).Set(1)
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize), sdkclient.WithUdsSockAddr(sdkclient.ReduceStreamAddr))
} else {
// Wait for server info to be ready
serverInfo, err = serverinfo.SDKServerInfo(serverinfo.WithServerInfoFilePath(sdkclient.ReduceServerInfoFile))
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeReducer), serverInfo.Version, string(serverInfo.Language)).Set(1)
client, err = reducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
}
if err != nil {
Expand All @@ -134,6 +136,7 @@ func (u *ReduceUDFProcessor) Start(ctx context.Context) error {
if err != nil {
return err
}
metrics.SDKInfo.WithLabelValues(dfv1.ComponentVertex, fmt.Sprintf("%s-%s", pipelineName, vertexName), string(serverinfo.ContainerTypeSessionreducer), serverInfo.Version, string(serverInfo.Language)).Set(1)

client, err := sessionreducer.New(serverInfo, sdkclient.WithMaxMessageSize(maxMessageSize))
if err != nil {
Expand Down
26 changes: 2 additions & 24 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d507e19

Please sign in to comment.