Skip to content

Commit

Permalink
fix(dataflow): make each replica use unique subscription names (#6021)
Browse files Browse the repository at this point in the history
Following #6020, it was no longer possible to have multiple replicas of dataflow-engine subscribing simultaneously to the scheduler, because all were connecting with the same subscriber name, and a lock was added per name, first waiting the disconnection of the old subscriber before allowing a new one to progress.

We update the dataflow-engine code so that each replica connects with its own hostname as the subscriber name. If the hostname can not be determined, we subscribe with the name seldon-dataflow-engine- followed by the canonical string representation of a UUID v4.

The subscriber name can also be explicitly controlled by passing the --dataflow-replica-id argument or the DATAFLOW_REPLICA_ID environment variable, wich will take precedence, in that order, to setting the value as the hostname.
  • Loading branch information
lc525 authored Nov 1, 2024
1 parent cb5cfbd commit 2c79e77
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
23 changes: 22 additions & 1 deletion scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package io.seldon.dataflow

import com.natpryce.konfig.CommandLineOption
import com.natpryce.konfig.Configuration
import com.natpryce.konfig.ConfigurationMap
import com.natpryce.konfig.ConfigurationProperties
import com.natpryce.konfig.EnvironmentVariables
import com.natpryce.konfig.Key
Expand All @@ -25,6 +26,8 @@ import io.klogging.Level
import io.klogging.noCoLogger
import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms
import io.seldon.dataflow.kafka.security.KafkaSecurityProtocols
import java.net.InetAddress
import java.util.UUID

object Cli {
private const val ENV_VAR_PREFIX = "SELDON_"
Expand All @@ -34,6 +37,7 @@ object Cli {
val logLevelApplication = Key("log.level.app", enumType(*Level.values()))
val logLevelKafka = Key("log.level.kafka", enumType(*Level.values()))
val namespace = Key("pod.namespace", stringType)
val dataflowReplicaId = Key("dataflow.replica.id", stringType)

// Seldon components
val upstreamHost = Key("upstream.host", stringType)
Expand Down Expand Up @@ -75,6 +79,7 @@ object Cli {
logLevelApplication,
logLevelKafka,
namespace,
dataflowReplicaId,
upstreamHost,
upstreamPort,
kafkaBootstrapServers,
Expand Down Expand Up @@ -105,10 +110,26 @@ object Cli {

fun configWith(rawArgs: Array<String>): Configuration {
val fromProperties = ConfigurationProperties.fromResource("local.properties")
val fromSystem = getSystemConfig()
val fromEnv = EnvironmentVariables(prefix = ENV_VAR_PREFIX)
val fromArgs = parseArguments(rawArgs)

return fromArgs overriding fromEnv overriding fromProperties
return fromArgs overriding fromEnv overriding fromSystem overriding fromProperties
}

private fun getSystemConfig(): Configuration {
val dataflowIdPair = this.dataflowReplicaId to getNewDataflowId()
return ConfigurationMap(dataflowIdPair)
}

fun getNewDataflowId(assignRandomUuid: Boolean = false): String {
if (!assignRandomUuid) {
try {
return InetAddress.getLocalHost().hostName
} catch (_: Exception) {
}
}
return "seldon-dataflow-engine-" + UUID.randomUUID().toString()
}

private fun parseArguments(rawArgs: Array<String>): Configuration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ object Main {
describeRetries = config[Cli.topicDescribeRetries],
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis],
)
val subscriberId = config[Cli.dataflowReplicaId]

val subscriber =
PipelineSubscriber(
"seldon-dataflow-engine",
subscriberId,
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
Expand Down
1 change: 1 addition & 0 deletions scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
log.level.app=INFO
log.level.kafka=WARN
dataflow.replica.id=seldon-dataflow-engine
kafka.bootstrap.servers=localhost:9092
kafka.consumer.prefix=
kafka.security.protocol=PLAINTEXT
Expand Down
31 changes: 31 additions & 0 deletions scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.MethodSource
import strikt.api.expectCatching
import strikt.api.expectThat
import strikt.assertions.hasLength
import strikt.assertions.isEqualTo
import strikt.assertions.isNotEqualTo
import strikt.assertions.isSuccess
import strikt.assertions.startsWith
import java.util.UUID
import java.util.stream.Stream
import kotlin.test.Test

internal class CliTest {
@DisplayName("Passing auth mechanism via cli argument")
Expand All @@ -36,6 +42,31 @@ internal class CliTest {
.isEqualTo(expectedMechanism)
}

@Test
fun `should handle dataflow replica id`() {
val cliDefault = Cli.configWith(arrayOf<String>())
val testReplicaId = "dataflow-id-1"
val cli = Cli.configWith(arrayOf("--dataflow-replica-id", testReplicaId))

expectThat(cliDefault[Cli.dataflowReplicaId]) {
isNotEqualTo("seldon-dataflow-engine")
}
expectThat(cli[Cli.dataflowReplicaId]) {
isEqualTo(testReplicaId)
}

// test random Uuid (v4)
val expectedReplicaIdPrefix = "seldon-dataflow-engine-"
val uuidStringLength = 36
val randomReplicaUuid = Cli.getNewDataflowId(true)
expectThat(randomReplicaUuid) {
startsWith(expectedReplicaIdPrefix)
hasLength(expectedReplicaIdPrefix.length + uuidStringLength)
}
expectCatching { UUID.fromString(randomReplicaUuid.removePrefix(expectedReplicaIdPrefix)) }
.isSuccess()
}

companion object {
@JvmStatic
private fun saslMechanisms(): Stream<Arguments> {
Expand Down

0 comments on commit 2c79e77

Please sign in to comment.