Skip to content

Commit

Permalink
fix(dataflow): make each replica use unique subscription names
Browse files Browse the repository at this point in the history
Following #6020, it was no longer possible to have multiple replicas of
dataflow-engine subscribing simultaneously to the scheduler, because
all were connecting with the same subscriber name, and a lock was added
per name, first waiting the disconnection of the old subscriber before
allowing a new one to progress.

We update the dataflow-engine code so that each replica connects with
its own hostname as the subscriber name. If the hostname can not
be determined, we subscribe with the name `seldon-dataflow-engine-` followed
by a hex string of 50 random characters.

The subscriber name can also be explicitly controlled by passing the
`--dataflow-replica-id` argument or the `DATAFLOW_REPLICA_ID` environment
variable, wich will take precedence, in that order, to setting the value
as the hostname.
  • Loading branch information
lc525 committed Nov 1, 2024
1 parent 294b5f8 commit 4ecf712
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
22 changes: 21 additions & 1 deletion scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package io.seldon.dataflow

import com.natpryce.konfig.CommandLineOption
import com.natpryce.konfig.Configuration
import com.natpryce.konfig.ConfigurationMap
import com.natpryce.konfig.ConfigurationProperties
import com.natpryce.konfig.EnvironmentVariables
import com.natpryce.konfig.Key
Expand All @@ -25,6 +26,7 @@ import io.klogging.Level
import io.klogging.noCoLogger
import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms
import io.seldon.dataflow.kafka.security.KafkaSecurityProtocols
import java.net.InetAddress

object Cli {
private const val ENV_VAR_PREFIX = "SELDON_"
Expand All @@ -34,6 +36,7 @@ object Cli {
val logLevelApplication = Key("log.level.app", enumType(*Level.values()))
val logLevelKafka = Key("log.level.kafka", enumType(*Level.values()))
val namespace = Key("pod.namespace", stringType)
val dataflowReplicaId = Key("dataflow.replica.id", stringType)

// Seldon components
val upstreamHost = Key("upstream.host", stringType)
Expand Down Expand Up @@ -75,6 +78,7 @@ object Cli {
logLevelApplication,
logLevelKafka,
namespace,
dataflowReplicaId,
upstreamHost,
upstreamPort,
kafkaBootstrapServers,
Expand Down Expand Up @@ -105,10 +109,26 @@ object Cli {

fun configWith(rawArgs: Array<String>): Configuration {
val fromProperties = ConfigurationProperties.fromResource("local.properties")
val fromSystem = getSystemConfig()
val fromEnv = EnvironmentVariables(prefix = ENV_VAR_PREFIX)
val fromArgs = parseArguments(rawArgs)

return fromArgs overriding fromEnv overriding fromProperties
return fromArgs overriding fromEnv overriding fromSystem overriding fromProperties
}

private fun getSystemConfig(): Configuration {
val dataflowIdPair = this.dataflowReplicaId to getDataflowId()
return ConfigurationMap(dataflowIdPair)
}

private fun getDataflowId(): String {
return try {
InetAddress.getLocalHost().hostName
} catch (e: Exception) {
val hexCharPool: List<Char> = ('a'..'f') + ('0'..'9')
val randomIdLength = 50
return "seldon-dataflow-engine-" + List(randomIdLength) { hexCharPool.random() }.joinToString("")
}
}

private fun parseArguments(rawArgs: Array<String>): Configuration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ object Main {
describeRetries = config[Cli.topicDescribeRetries],
describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis],
)
val subscriberId = config[Cli.dataflowReplicaId]

val subscriber =
PipelineSubscriber(
"seldon-dataflow-engine",
subscriberId,
kafkaProperties,
kafkaAdminProperties,
kafkaStreamsParams,
Expand Down
1 change: 1 addition & 0 deletions scheduler/data-flow/src/main/resources/local.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
log.level.app=INFO
log.level.kafka=WARN
dataflow.replica.id=seldon-dataflow-engine
kafka.bootstrap.servers=localhost:9092
kafka.consumer.prefix=
kafka.security.protocol=PLAINTEXT
Expand Down
17 changes: 17 additions & 0 deletions scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ import org.junit.jupiter.params.provider.Arguments
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.MethodSource
import strikt.api.expectCatching
import strikt.api.expectThat
import strikt.assertions.isEqualTo
import strikt.assertions.isNotEqualTo
import strikt.assertions.isSuccess
import java.util.stream.Stream
import kotlin.test.Test

internal class CliTest {
@DisplayName("Passing auth mechanism via cli argument")
Expand All @@ -36,6 +39,20 @@ internal class CliTest {
.isEqualTo(expectedMechanism)
}

@Test
fun `should handle dataflow replica id`() {
val cliDefault = Cli.configWith(arrayOf<String>())
val testReplicaId = "dataflow-id-1"
val cli = Cli.configWith(arrayOf("--dataflow-replica-id", testReplicaId))

expectThat(cliDefault[Cli.dataflowReplicaId]) {
isNotEqualTo("seldon-dataflow-engine")
}
expectThat(cli[Cli.dataflowReplicaId]) {
isEqualTo(testReplicaId)
}
}

companion object {
@JvmStatic
private fun saslMechanisms(): Stream<Arguments> {
Expand Down

0 comments on commit 4ecf712

Please sign in to comment.