Skip to content

Commit

Permalink
refactor: remove parser as a dependency for orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Oct 29, 2024
1 parent 8956747 commit f843f38
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 20 deletions.
2 changes: 1 addition & 1 deletion orchestrator/rdfc-cli/src/main/kotlin/Exec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal suspend fun exec(path: String) {
val parser = JenaParser(listOf(listOf(config), indexFiles).flatten())

// Start the orchestrator.
val orchestrator = SimpleOrchestrator(parser)
val orchestrator = SimpleOrchestrator(parser.runners(), parser.stages())
orchestrator.exec()

Log.shared.info("Pipeline execution succeeded.")
Expand Down
1 change: 0 additions & 1 deletion orchestrator/rdfc-orchestrator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencies {
// Local dependencies
implementation(project(":rdfc-core"))
implementation(project(":rdfc-processor"))
implementation(project(":rdfc-parser"))

// gRPC
implementation("io.grpc:grpc-netty:1.64.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,42 @@ import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import technology.idlab.rdfc.core.intermediate.IRRunner
import technology.idlab.rdfc.core.intermediate.IRStage
import technology.idlab.rdfc.orchestrator.broker.Broker
import technology.idlab.rdfc.orchestrator.broker.simple.SimpleBroker
import technology.idlab.rdfc.orchestrator.orchestrator.Orchestrator
import technology.idlab.rdfc.orchestrator.runner.Runner
import technology.idlab.rdfc.parser.Parser

/**
* A simple implementation of an orchestrator which only succeeds if all runners succeed without
* intervention.
*
* @param parser The parser which is used to parse the configuration.
* @param runners All the runners which should be instantiated before loading the stages.
* @param stages A collection of stages which should be run. Note that all the corresponding runners
* should be included in the `runner` declaration.
*/
class SimpleOrchestrator(parser: Parser) : Orchestrator {
class SimpleOrchestrator(runners: Collection<IRRunner>, stages: Collection<IRStage>) :
Orchestrator {
/** Message broker. */
private val broker: Broker<ByteArray>

/** All the runners used in the pipeline. */
private val runners: List<Runner>

/** Load all stages into their respective runners. */
init {
val runners = mutableListOf<Runner>()
// Initialize all runners with their corresponding stages.
val instances = mutableListOf<Runner>()

for (runner in parser.runners()) {
val stages = parser.stages(runner)
runners.add(Runner.from(runner, stages))
for (runner in runners) {
val targets = stages.filter { it.processor.target == runner.uri }
val instance = Runner.from(runner, targets)
instances.add(instance)
}

this.runners = runners
}

/** Initialize a broker. */
init {
this.broker = SimpleBroker(this.runners)
// Initialize a broker.
this.broker = SimpleBroker(instances)
this.runners = instances
}

/** Execute all stages in all the runtimes. */
Expand Down
4 changes: 2 additions & 2 deletions orchestrator/rdfc-parser/src/main/kotlin/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ interface Parser {
/**
* Get all stages for a given runner.
*
* @param runner The runner to get stages for.
* @param runner The runner to get stages for. If none is provided, all stages will be returned.
* @return The stages for the given runner.
*/
fun stages(runner: IRRunner): List<IRStage>
fun stages(runner: IRRunner? = null): List<IRStage>
}
10 changes: 8 additions & 2 deletions orchestrator/rdfc-parser/src/main/kotlin/impl/JenaParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,13 @@ class JenaParser(
return IRRunner(runner.toString(), wd, entrypoint.toString(), type)
}

override fun stages(runner: IRRunner): List<IRStage> {
return pipelines().flatMap { it.stages }.filter { it.processor.target == runner.uri }
override fun stages(runner: IRRunner?): List<IRStage> {
val result = pipelines().flatMap { it.stages }

return if (runner != null) {
return result.filter { it.processor.target == runner.uri }
} else {
result
}
}
}

0 comments on commit f843f38

Please sign in to comment.