diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt index 1e5485aaea..015983e674 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Cli.kt @@ -11,6 +11,7 @@ package io.seldon.dataflow import com.natpryce.konfig.CommandLineOption import com.natpryce.konfig.Configuration +import com.natpryce.konfig.ConfigurationMap import com.natpryce.konfig.ConfigurationProperties import com.natpryce.konfig.EnvironmentVariables import com.natpryce.konfig.Key @@ -25,6 +26,7 @@ import io.klogging.Level import io.klogging.noCoLogger import io.seldon.dataflow.kafka.security.KafkaSaslMechanisms import io.seldon.dataflow.kafka.security.KafkaSecurityProtocols +import java.net.InetAddress object Cli { private const val ENV_VAR_PREFIX = "SELDON_" @@ -34,6 +36,7 @@ object Cli { val logLevelApplication = Key("log.level.app", enumType(*Level.values())) val logLevelKafka = Key("log.level.kafka", enumType(*Level.values())) val namespace = Key("pod.namespace", stringType) + val dataflowReplicaId = Key("dataflow.replica.id", stringType) // Seldon components val upstreamHost = Key("upstream.host", stringType) @@ -75,6 +78,7 @@ object Cli { logLevelApplication, logLevelKafka, namespace, + dataflowReplicaId, upstreamHost, upstreamPort, kafkaBootstrapServers, @@ -105,10 +109,26 @@ object Cli { fun configWith(rawArgs: Array): Configuration { val fromProperties = ConfigurationProperties.fromResource("local.properties") + val fromSystem = getSystemConfig() val fromEnv = EnvironmentVariables(prefix = ENV_VAR_PREFIX) val fromArgs = parseArguments(rawArgs) - return fromArgs overriding fromEnv overriding fromProperties + return fromArgs overriding fromEnv overriding fromSystem overriding fromProperties + } + + private fun getSystemConfig(): Configuration { + val dataflowIdPair = this.dataflowReplicaId to getDataflowId() + return ConfigurationMap(dataflowIdPair) + } + + private fun getDataflowId(): String { + return try { + InetAddress.getLocalHost().hostName + } catch (e: Exception) { + val hexCharPool: List = ('a'..'f') + ('0'..'9') + val randomIdLength = 50 + return "seldon-dataflow-engine-" + List(randomIdLength) { hexCharPool.random() }.joinToString("") + } } private fun parseArguments(rawArgs: Array): Configuration { diff --git a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt index 8d4a899eaa..b064a974f5 100644 --- a/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt +++ b/scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/Main.kt @@ -102,9 +102,11 @@ object Main { describeRetries = config[Cli.topicDescribeRetries], describeRetryDelayMillis = config[Cli.topicDescribeRetryDelayMillis], ) + val subscriberId = config[Cli.dataflowReplicaId] + val subscriber = PipelineSubscriber( - "seldon-dataflow-engine", + subscriberId, kafkaProperties, kafkaAdminProperties, kafkaStreamsParams, diff --git a/scheduler/data-flow/src/main/resources/local.properties b/scheduler/data-flow/src/main/resources/local.properties index 46a7218380..68b3bd408d 100644 --- a/scheduler/data-flow/src/main/resources/local.properties +++ b/scheduler/data-flow/src/main/resources/local.properties @@ -1,5 +1,6 @@ log.level.app=INFO log.level.kafka=WARN +dataflow.replica.id=seldon-dataflow-engine kafka.bootstrap.servers=localhost:9092 kafka.consumer.prefix= kafka.security.protocol=PLAINTEXT diff --git a/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt b/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt index 9011ff3d4c..52f6354690 100644 --- a/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt +++ b/scheduler/data-flow/src/test/kotlin/io/seldon/dataflow/CliTest.kt @@ -16,9 +16,12 @@ import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.Arguments.arguments import org.junit.jupiter.params.provider.MethodSource import strikt.api.expectCatching +import strikt.api.expectThat import strikt.assertions.isEqualTo +import strikt.assertions.isNotEqualTo import strikt.assertions.isSuccess import java.util.stream.Stream +import kotlin.test.Test internal class CliTest { @DisplayName("Passing auth mechanism via cli argument") @@ -36,6 +39,20 @@ internal class CliTest { .isEqualTo(expectedMechanism) } + @Test + fun `should handle dataflow replica id`() { + val cliDefault = Cli.configWith(arrayOf()) + val testReplicaId = "dataflow-id-1" + val cli = Cli.configWith(arrayOf("--dataflow-replica-id", testReplicaId)) + + expectThat(cliDefault[Cli.dataflowReplicaId]) { + isNotEqualTo("seldon-dataflow-engine") + } + expectThat(cli[Cli.dataflowReplicaId]) { + isEqualTo(testReplicaId) + } + } + companion object { @JvmStatic private fun saslMechanisms(): Stream {