Skip to content

Commit

Permalink
feat: container-type level version compatibility check (numaproj#2087)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang authored Sep 25, 2024
1 parent 6d1ebd0 commit 895a778
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 25 deletions.
2 changes: 0 additions & 2 deletions pkg/sdkclient/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (

// Server information file configs
MapServerInfoFile = "/var/run/numaflow/mapper-server-info"
MapStreamServerInfoFile = "/var/run/numaflow/mapstreamer-server-info"
ReduceServerInfoFile = "/var/run/numaflow/reducer-server-info"
ReduceStreamServerInfoFile = "/var/run/numaflow/reducestreamer-server-info"
SessionReduceServerInfoFile = "/var/run/numaflow/sessionreducer-server-info"
Expand All @@ -47,5 +46,4 @@ const (
FbSinkServerInfoFile = "/var/run/numaflow/fb-sinker-server-info"
SourceServerInfoFile = "/var/run/numaflow/sourcer-server-info"
SourceTransformerServerInfoFile = "/var/run/numaflow/sourcetransformer-server-info"
BatchMapServerInfoFile = "/var/run/numaflow/batchmapper-server-info"
)
26 changes: 23 additions & 3 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
sdkLanguage := serverInfo.Language
numaflowVersion := numaflow.GetVersion().Version
containerType, err := getContainerType(filePath)
if err != nil {
return nil, fmt.Errorf("failed to get container type: %w", err)
}

// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
// error writing server info file on the SDK side
Expand All @@ -87,7 +91,7 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, err
if sdkVersion == "" || sdkLanguage == "" {
log.Printf("warning: failed to get the SDK version/language, skipping SDK version compatibility check")
} else {
if err := checkSDKCompatibility(sdkVersion, sdkLanguage, minimumSupportedSDKVersions); err != nil {
if err := checkSDKCompatibility(sdkVersion, sdkLanguage, containerType, minimumSupportedSDKVersions); err != nil {
return nil, fmt.Errorf("SDK version %s does not satisfy the minimum required by numaflow version %s: %w",
sdkVersion, numaflowVersion, err)
}
Expand Down Expand Up @@ -176,8 +180,11 @@ func checkNumaflowCompatibility(numaflowVersion string, minNumaflowVersion strin
}

// checkSDKCompatibility checks if the current SDK version is compatible with the numaflow version
func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupportedSDKVersions sdkConstraints) error {
if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage]; ok {
func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, containerType ContainerType, minSupportedSDKVersions sdkConstraints) error {
if _, ok := minSupportedSDKVersions[sdkLanguage]; !ok {
return fmt.Errorf("SDK language %s is not supported", sdkLanguage)
}
if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage][containerType]; ok {
sdkConstraint := fmt.Sprintf(">= %s", sdkRequiredVersion)
if sdkLanguage == Python {
// Python pre-releases/releases follow PEP440 specification which requires a different library for parsing
Expand Down Expand Up @@ -206,6 +213,19 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupported
sdkVersionSemVer.String(), humanReadable(sdkRequiredVersion), err)
}
}
} else {
return fmt.Errorf("SDK container type %s is not supported", containerType)
}
return nil
}

// getContainerType returns the container type from the server info file path
// serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info"
func getContainerType(serverInfoFilePath string) (ContainerType, error) {
splits := strings.Split(serverInfoFilePath, "/")
if containerType := strings.TrimSuffix(splits[len(splits)-1], "-server-info"); containerType == "" {
return "", fmt.Errorf("failed to get container type from server info file path: %s", serverInfoFilePath)
} else {
return ContainerType(containerType), nil
}
}
38 changes: 27 additions & 11 deletions pkg/sdkclient/serverinfo/serverinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func Test_SDKServerInfo(t *testing.T) {
filepath := os.TempDir() + "/server-info"
filepath := os.TempDir() + "/sourcer-server-info"
defer os.Remove(filepath)
info := &ServerInfo{
Protocol: TCP,
Expand Down Expand Up @@ -185,10 +185,18 @@ func Test_CheckNumaflowCompatibility(t *testing.T) {
// this test suite is to test SDK compatibility check when all the minimum-supported versions are stable releases
func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: "0.6.0rc100",
Go: "0.6.0-z",
Java: "0.6.0-z",
Rust: "0.1.0-z",
Python: map[ContainerType]string{
sourcer: "0.6.0rc100",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-z",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-z",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-z",
},
}
tests := []struct {
name string
Expand Down Expand Up @@ -275,7 +283,7 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand All @@ -289,10 +297,18 @@ func Test_CheckSDKCompatibility_MinimumBeingStableReleases(t *testing.T) {
// this test suite is to test SDK compatibility check when all the minimum-supported versions are pre-releases
func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
var testMinimumSupportedSDKVersions = sdkConstraints{
Python: "0.6.0b1",
Go: "0.6.0-rc2",
Java: "0.6.0-rc2",
Rust: "0.1.0-rc3",
Python: map[ContainerType]string{
sourcer: "0.6.0b1",
},
Go: map[ContainerType]string{
sourcer: "0.6.0-rc2",
},
Java: map[ContainerType]string{
sourcer: "0.6.0-rc2",
},
Rust: map[ContainerType]string{
sourcer: "0.1.0-rc3",
},
}
tests := []struct {
name string
Expand Down Expand Up @@ -379,7 +395,7 @@ func Test_CheckSDKCompatibility_MinimumBeingPreReleases(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, tt.minimumSupportedSDKVersions)
err := checkSDKCompatibility(tt.sdkVersion, tt.sdkLanguage, sourcer, tt.minimumSupportedSDKVersions)
if tt.shouldErr {
assert.Error(t, err, "Expected error")
assert.Contains(t, err.Error(), tt.errMessage)
Expand Down
74 changes: 65 additions & 9 deletions pkg/sdkclient/serverinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,23 @@ const (
Rust Language = "rust"
)

type sdkConstraints map[Language]string
type ContainerType string

// the string content matches the corresponding server info file name.
// DO NOT change it unless the server info file name is changed.
const (
sourcer ContainerType = "sourcer"
sourcetransformer ContainerType = "sourcetransformer"
sinker ContainerType = "sinker"
mapper ContainerType = "mapper"
reducer ContainerType = "reducer"
reducestreamer ContainerType = "reducestreamer"
sessionreducer ContainerType = "sessionreducer"
sideinput ContainerType = "sideinput"
fbsinker ContainerType = "fb-sinker"
)

type sdkConstraints map[Language]map[ContainerType]string

/*
minimumSupportedSDKVersions is the minimum supported version of each SDK for the current numaflow version.
Expand Down Expand Up @@ -70,14 +86,54 @@ A constraint ">=0.8.0-z" will match any pre-release version of 0.8.0, including
More details about version comparison can be found in the PEP 440 and semver documentation.
*/
var minimumSupportedSDKVersions = sdkConstraints{
// meaning the minimum supported python SDK version is 0.8.0
Python: "0.8.0rc100",
// meaning the minimum supported go SDK version is 0.8.0
Go: "0.8.0-z",
// meaning the minimum supported java SDK version is 0.8.0
Java: "0.8.0-z",
// meaning the minimum supported rust SDK version is 0.1.0
Rust: "0.1.0-z",
Python: map[ContainerType]string{
// meaning the minimum supported python SDK version is 0.8.0
sourcer: "0.8.0rc100",
sourcetransformer: "0.8.0rc100",
sinker: "0.8.0rc100",
mapper: "0.8.0rc100",
reducer: "0.8.0rc100",
reducestreamer: "0.8.0rc100",
sessionreducer: "0.8.0rc100",
sideinput: "0.8.0rc100",
fbsinker: "0.8.0rc100",
},
Go: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.8.0
sourcer: "0.8.0-z",
sourcetransformer: "0.8.0-z",
sinker: "0.8.0-z",
mapper: "0.8.0-z",
reducer: "0.8.0-z",
reducestreamer: "0.8.0-z",
sessionreducer: "0.8.0-z",
sideinput: "0.8.0-z",
fbsinker: "0.8.0-z",
},
Java: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.8.0
sourcer: "0.8.0-z",
sourcetransformer: "0.8.0-z",
sinker: "0.8.0-z",
mapper: "0.8.0-z",
reducer: "0.8.0-z",
reducestreamer: "0.8.0-z",
sessionreducer: "0.8.0-z",
sideinput: "0.8.0-z",
fbsinker: "0.8.0-z",
},
Rust: map[ContainerType]string{
// meaning the minimum supported go SDK version is 0.1.0
sourcer: "0.1.0-z",
sourcetransformer: "0.1.0-z",
sinker: "0.1.0-z",
mapper: "0.1.0-z",
reducer: "0.1.0-z",
reducestreamer: "0.1.0-z",
sessionreducer: "0.1.0-z",
sideinput: "0.1.0-z",
fbsinker: "0.1.0-z",
},
}

// humanReadable returns the human-readable minimum supported version.
Expand Down

0 comments on commit 895a778

Please sign in to comment.