Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dataflow): make each replica use unique subscription names #6021

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package io.seldon.dataflow

import com.natpryce.konfig.CommandLineOption
import com.natpryce.konfig.Configuration
import com.natpryce.konfig.ConfigurationMap
import com.natpryce.konfig.ConfigurationProperties
import com.natpryce.konfig.EnvironmentVariables
import com.natpryce.konfig.Key
Expand All @@ -25,6 +26,8 @@ import io.klogging.Level
import io.klogging.noCoLogger
import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms
import io.seldon.dataflow.kafka.security.KafkaSecurityProtocols
import java.net.InetAddress
import java.util.UUID

object Cli {
private const val ENV_VAR_PREFIX = "SELDON_"
Expand All @@ -34,6 +37,7 @@ object Cli {
val logLevelApplication = Key("log.level.app", enumType(*Level.values()))
val logLevelKafka = Key("log.level.kafka", enumType(*Level.values()))
val namespace = Key("pod.namespace", stringType)
val dataflowReplicaId = Key("dataflow.replica.id", stringType)

// Seldon components
val upstreamHost = Key("upstream.host", stringType)
Expand Down Expand Up @@ -75,6 +79,7 @@ object Cli {
logLevelApplication,
logLevelKafka,
namespace,
dataflowReplicaId,
upstreamHost,
upstreamPort,
kafkaBootstrapServers,
Expand Down Expand Up @@ -105,10 +110,26 @@ object Cli {

fun configWith(rawArgs: Array<String>): Configuration {
val fromProperties = ConfigurationProperties.fromResource("local.properties")
val fromSystem = getSystemConfig()
val fromEnv = EnvironmentVariables(prefix = ENV_VAR_PREFIX)
val fromArgs = parseArguments(rawArgs)

return fromArgs overriding fromEnv overriding fromProperties
return fromArgs overriding fromEnv overriding fromSystem overriding fromProperties
}

private fun getSystemConfig(): Configuration {
val dataflowIdPair = this.dataflowReplicaId to getNewDataflowId()
return ConfigurationMap(dataflowIdPair)
}

fun getNewDataflowId(assignRandomUuid: Boolean = false): String {
if (!assignRandomUuid) {
try {
return InetAddress.getLocalHost().hostName
} catch (_: Exception) {
}
}
return "seldon-dataflow-engine-" + UUID.randomUUID().toString()
}

private fun parseArguments(rawArgs: Array<String>): Configuration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ object Main {
describeRetries = config[Cli.topicDescribeRetries],
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis],
)
val subscriberId = config[Cli.dataflowReplicaId]

val subscriber =
PipelineSubscriber(
"seldon-dataflow-engine",
subscriberId,
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
Expand Down
1 change: 1 addition & 0 deletions scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
log.level.app=INFO
log.level.kafka=WARN
dataflow.replica.id=seldon-dataflow-engine
kafka.bootstrap.servers=localhost:9092
kafka.consumer.prefix=
kafka.security.protocol=PLAINTEXT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.MethodSource
import strikt.api.expectCatching
import strikt.api.expectThat
import strikt.assertions.hasLength
import strikt.assertions.isEqualTo
import strikt.assertions.isNotEqualTo
import strikt.assertions.isSuccess
import strikt.assertions.startsWith
import java.util.UUID
import java.util.stream.Stream
import kotlin.test.Test

internal class CliTest {
@DisplayName("Passing auth mechanism via cli argument")
Expand All @@ -36,6 +42,31 @@ internal class CliTest {
.isEqualTo(expectedMechanism)
}

@Test
fun `should handle dataflow replica id`() {
val cliDefault = Cli.configWith(arrayOf<String>())
val testReplicaId = "dataflow-id-1"
val cli = Cli.configWith(arrayOf("--dataflow-replica-id", testReplicaId))

expectThat(cliDefault[Cli.dataflowReplicaId]) {
isNotEqualTo("seldon-dataflow-engine")
}
expectThat(cli[Cli.dataflowReplicaId]) {
isEqualTo(testReplicaId)
}

// test random Uuid (v4)
val expectedReplicaIdPrefix = "seldon-dataflow-engine-"
val uuidStringLength = 36
val randomReplicaUuid = Cli.getNewDataflowId(true)
expectThat(randomReplicaUuid) {
startsWith(expectedReplicaIdPrefix)
hasLength(expectedReplicaIdPrefix.length + uuidStringLength)
}
expectCatching { UUID.fromString(randomReplicaUuid.removePrefix(expectedReplicaIdPrefix)) }
.isSuccess()
}

companion object {
@JvmStatic
private fun saslMechanisms(): Stream<Arguments> {
Expand Down
Loading