From 895a7780410b7bb5a43a4ab6f4dd55c1c145561f Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 25 Sep 2024 17:47:12 -0400 Subject: [PATCH] feat: container-type level version compatibility check (#2087) Signed-off-by: Keran Yang --- pkg/sdkclient/const.go | 2 - pkg/sdkclient/serverinfo/serverinfo.go | 26 +++++++- pkg/sdkclient/serverinfo/serverinfo_test.go | 38 ++++++++--- pkg/sdkclient/serverinfo/types.go | 74 ++++++++++++++++++--- 4 files changed, 115 insertions(+), 25 deletions(-) diff --git a/pkg/sdkclient/const.go b/pkg/sdkclient/const.go index b7ace077cf..54f88b66cb 100644 --- a/pkg/sdkclient/const.go +++ b/pkg/sdkclient/const.go @@ -38,7 +38,6 @@ const ( // Server information file configs MapServerInfoFile = "/var/run/numaflow/mapper-server-info" - MapStreamServerInfoFile = "/var/run/numaflow/mapstreamer-server-info" ReduceServerInfoFile = "/var/run/numaflow/reducer-server-info" ReduceStreamServerInfoFile = "/var/run/numaflow/reducestreamer-server-info" SessionReduceServerInfoFile = "/var/run/numaflow/sessionreducer-server-info" @@ -47,5 +46,4 @@ const ( FbSinkServerInfoFile = "/var/run/numaflow/fb-sinker-server-info" SourceServerInfoFile = "/var/run/numaflow/sourcer-server-info" SourceTransformerServerInfoFile = "/var/run/numaflow/sourcetransformer-server-info" - BatchMapServerInfoFile = "/var/run/numaflow/batchmapper-server-info" ) diff --git a/pkg/sdkclient/serverinfo/serverinfo.go b/pkg/sdkclient/serverinfo/serverinfo.go index d01acd1cda..f94d83e072 100644 --- a/pkg/sdkclient/serverinfo/serverinfo.go +++ b/pkg/sdkclient/serverinfo/serverinfo.go @@ -67,6 +67,10 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err minNumaflowVersion := serverInfo.MinimumNumaflowVersion sdkLanguage := serverInfo.Language numaflowVersion := numaflow.GetVersion().Version + containerType, err := getContainerType(filePath) + if err != nil { + return nil, fmt.Errorf("failed to get container type: %w", err) + } // If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an // error writing server info file on the SDK side @@ -87,7 +91,7 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err if sdkVersion == "" || sdkLanguage == "" { log.Printf("warning: failed to get the SDK version/language, skipping SDK version compatibility check") } else { - if err := checkSDKCompatibility(sdkVersion, sdkLanguage, minimumSupportedSDKVersions); err != nil { + if err := checkSDKCompatibility(sdkVersion, sdkLanguage, containerType, minimumSupportedSDKVersions); err != nil { return nil, fmt.Errorf("SDK version %s does not satisfy the minimum required by numaflow version %s: %w", sdkVersion, numaflowVersion, err) } @@ -176,8 +180,11 @@ func checkNumaflowCompatibility(numaflowVersion string, minNumaflowVersion strin } // checkSDKCompatibility checks if the current SDK version is compatible with the numaflow version -func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupportedSDKVersions sdkConstraints) error { - if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage]; ok { +func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, containerType ContainerType, minSupportedSDKVersions sdkConstraints) error { + if _, ok := minSupportedSDKVersions[sdkLanguage]; !ok { + return fmt.Errorf("SDK language %s is not supported", sdkLanguage) + } + if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage][containerType]; ok { sdkConstraint := fmt.Sprintf(">= %s", sdkRequiredVersion) if sdkLanguage == Python { // Python pre-releases/releases follow PEP440 specification which requires a different library for parsing @@ -206,6 +213,19 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupported sdkVersionSemVer.String(), humanReadable(sdkRequiredVersion), err) } } + } else { + return fmt.Errorf("SDK container type %s is not supported", containerType) } return nil } + +// getContainerType returns the container type from the server info file path +// serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info" +func getContainerType(serverInfoFilePath string) (ContainerType, error) { + splits := strings.Split(serverInfoFilePath, "/") + if containerType := strings.TrimSuffix(splits[len(splits)-1], "-server-info"); containerType == "" { + return "", fmt.Errorf("failed to get container type from server info file path: %s", serverInfoFilePath) + } else { + return ContainerType(containerType), nil + } +} diff --git a/pkg/sdkclient/serverinfo/serverinfo_test.go b/pkg/sdkclient/serverinfo/serverinfo_test.go index ad7f06e690..105775de17 100644 --- a/pkg/sdkclient/serverinfo/serverinfo_test.go +++ b/pkg/sdkclient/serverinfo/serverinfo_test.go @@ -29,7 +29,7 @@ import ( ) func Test_SDKServerInfo(t *testing.T) { - filepath := os.TempDir() + "/server-info" + filepath := os.TempDir() + "/sourcer-server-info" defer os.Remove(filepath) info := &ServerInfo{ Protocol: TCP, @@ -185,10 +185,18 @@ func Test_CheckNumaflowCompatibility(t *testing.T) { // this test suite is to test SDK compatibility check when all the minimum-supported versions are stable releases func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) { var testMinimumSupportedSDKVersions = sdkConstraints{ - Python: "0.6.0rc100", - Go: "0.6.0-z", - Java: "0.6.0-z", - Rust: "0.1.0-z", + Python: map[ContainerType]string{ + sourcer: "0.6.0rc100", + }, + Go: map[ContainerType]string{ + sourcer: "0.6.0-z", + }, + Java: map[ContainerType]string{ + sourcer: "0.6.0-z", + }, + Rust: map[ContainerType]string{ + sourcer: "0.1.0-z", + }, } tests := []struct { name string @@ -275,7 +283,7 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, tt.minimumSupportedSDKVersions) + err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions) if tt.shouldErr { assert.Error(t, err, "Expected error") assert.Contains(t, err.Error(), tt.errMessage) @@ -289,10 +297,18 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) { // this test suite is to test SDK compatibility check when all the minimum-supported versions are pre-releases func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) { var testMinimumSupportedSDKVersions = sdkConstraints{ - Python: "0.6.0b1", - Go: "0.6.0-rc2", - Java: "0.6.0-rc2", - Rust: "0.1.0-rc3", + Python: map[ContainerType]string{ + sourcer: "0.6.0b1", + }, + Go: map[ContainerType]string{ + sourcer: "0.6.0-rc2", + }, + Java: map[ContainerType]string{ + sourcer: "0.6.0-rc2", + }, + Rust: map[ContainerType]string{ + sourcer: "0.1.0-rc3", + }, } tests := []struct { name string @@ -379,7 +395,7 @@ func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, tt.minimumSupportedSDKVersions) + err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions) if tt.shouldErr { assert.Error(t, err, "Expected error") assert.Contains(t, err.Error(), tt.errMessage) diff --git a/pkg/sdkclient/serverinfo/types.go b/pkg/sdkclient/serverinfo/types.go index 9e4a152d03..23f6e2394d 100644 --- a/pkg/sdkclient/serverinfo/types.go +++ b/pkg/sdkclient/serverinfo/types.go @@ -27,7 +27,23 @@ const ( Rust Language = "rust" ) -type sdkConstraints map[Language]string +type ContainerType string + +// the string content matches the corresponding server info file name. +// DO NOT change it unless the server info file name is changed. +const ( + sourcer ContainerType = "sourcer" + sourcetransformer ContainerType = "sourcetransformer" + sinker ContainerType = "sinker" + mapper ContainerType = "mapper" + reducer ContainerType = "reducer" + reducestreamer ContainerType = "reducestreamer" + sessionreducer ContainerType = "sessionreducer" + sideinput ContainerType = "sideinput" + fbsinker ContainerType = "fb-sinker" +) + +type sdkConstraints map[Language]map[ContainerType]string /* minimumSupportedSDKVersions is the minimum supported version of each SDK for the current numaflow version. @@ -70,14 +86,54 @@ A constraint ">=0.8.0-z" will match any pre-release version of 0.8.0, including More details about version comparison can be found in the PEP 440 and semver documentation. */ var minimumSupportedSDKVersions = sdkConstraints{ - // meaning the minimum supported python SDK version is 0.8.0 - Python: "0.8.0rc100", - // meaning the minimum supported go SDK version is 0.8.0 - Go: "0.8.0-z", - // meaning the minimum supported java SDK version is 0.8.0 - Java: "0.8.0-z", - // meaning the minimum supported rust SDK version is 0.1.0 - Rust: "0.1.0-z", + Python: map[ContainerType]string{ + // meaning the minimum supported python SDK version is 0.8.0 + sourcer: "0.8.0rc100", + sourcetransformer: "0.8.0rc100", + sinker: "0.8.0rc100", + mapper: "0.8.0rc100", + reducer: "0.8.0rc100", + reducestreamer: "0.8.0rc100", + sessionreducer: "0.8.0rc100", + sideinput: "0.8.0rc100", + fbsinker: "0.8.0rc100", + }, + Go: map[ContainerType]string{ + // meaning the minimum supported go SDK version is 0.8.0 + sourcer: "0.8.0-z", + sourcetransformer: "0.8.0-z", + sinker: "0.8.0-z", + mapper: "0.8.0-z", + reducer: "0.8.0-z", + reducestreamer: "0.8.0-z", + sessionreducer: "0.8.0-z", + sideinput: "0.8.0-z", + fbsinker: "0.8.0-z", + }, + Java: map[ContainerType]string{ + // meaning the minimum supported go SDK version is 0.8.0 + sourcer: "0.8.0-z", + sourcetransformer: "0.8.0-z", + sinker: "0.8.0-z", + mapper: "0.8.0-z", + reducer: "0.8.0-z", + reducestreamer: "0.8.0-z", + sessionreducer: "0.8.0-z", + sideinput: "0.8.0-z", + fbsinker: "0.8.0-z", + }, + Rust: map[ContainerType]string{ + // meaning the minimum supported go SDK version is 0.1.0 + sourcer: "0.1.0-z", + sourcetransformer: "0.1.0-z", + sinker: "0.1.0-z", + mapper: "0.1.0-z", + reducer: "0.1.0-z", + reducestreamer: "0.1.0-z", + sessionreducer: "0.1.0-z", + sideinput: "0.1.0-z", + fbsinker: "0.1.0-z", + }, } // humanReadable returns the human-readable minimum supported version.