Skip to content

Commit

Permalink
refactor: minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Sep 25, 2024
1 parent f1635ed commit f179a0f
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 110 deletions.
6 changes: 6 additions & 0 deletions orchestrator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ dependencies {
implementation("io.arrow-kt:arrow-core:1.2.4")
implementation("io.arrow-kt:arrow-fx-coroutines:1.2.4")

// Koin dependency injection.
implementation(project.dependencies.platform("io.insert-koin:koin-bom:4.0.0"))
implementation("io.insert-koin:koin-core")
testImplementation("io.insert-koin:koin-test:4.0.0")
testImplementation("io.insert-koin:koin-test-junit5:4.0.0")

// gRPC
implementation("io.grpc:grpc-netty:1.64.0")
implementation("io.grpc:grpc-protobuf:1.64.0")
Expand Down
10 changes: 1 addition & 9 deletions orchestrator/src/main/kotlin/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package technology.idlab
import java.io.File
import kotlin.system.exitProcess
import kotlinx.coroutines.runBlocking
import technology.idlab.orchestrator.Orchestrator
import technology.idlab.orchestrator.impl.SimpleOrchestrator
import technology.idlab.parser.impl.jena.JenaParser
import technology.idlab.resolver.impl.GenericResolver
Expand Down Expand Up @@ -125,16 +124,9 @@ internal suspend fun exec(path: String) {
val parser = JenaParser(listOf(listOf(config), files).flatten())

// Start the orchestrator.
val pipeline = parser.pipelines().single()
val runners = parser.runners()
val orchestrator = SimpleOrchestrator(pipeline.stages, runners)
val orchestrator = SimpleOrchestrator(parser)
orchestrator.exec()

// Check the result.
if (orchestrator.status != Orchestrator.Status.SUCCESS) {
throw PipelineException(pipeline.uri)
}

Log.shared.info("Pipeline execution succeeded.")
}

Expand Down
5 changes: 5 additions & 0 deletions orchestrator/src/main/kotlin/broker/Broker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package technology.idlab.broker
/**
* A broker is a simple class which takes in messages targeting a specific URI, and routes them to
* all registered receivers.
*
* All receivers must be known during the construction of the broker, but can be removed at any
* other point in time.
*
* @param T The type of the data to send.
*/
interface Broker<T> {
/**
Expand Down
4 changes: 3 additions & 1 deletion orchestrator/src/main/kotlin/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package technology.idlab.broker

/**
* The `BrokerReceiver` may receive messages from the broker which are targeted to specific URIs.
* A `BrokerReceiver` may receive messages from the broker which are targeted to specific URIs.
*
* @param T The type of the data to receive.
*/
interface BrokerClient<T> {
/** The URI of the receiver. */
Expand Down
19 changes: 0 additions & 19 deletions orchestrator/src/main/kotlin/orchestrator/Orchestrator.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,6 @@ package technology.idlab.orchestrator

/** The orchestrator handles inter-runner communication and the execution of the pipeline. */
interface Orchestrator {
/** An indication of the current status of the orchestrator. */
enum class Status {
/** The orchestrator has been created, but not initialised. */
CREATED,
/** Preparing the orchestrator for execution. */
INITIALISING,
/** Ready for execution. */
READY,
/** The orchestrator is running. */
RUNNING,
/** A failure occurred, and the pipeline has exited. */
FAILED,
/** The pipeline has finished. */
SUCCESS,
}

/** The current status of the orchestrator. */
val status: Status

/** Execute the pipeline. */
suspend fun exec()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,77 +6,51 @@ import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import technology.idlab.broker.Broker
import technology.idlab.broker.impl.SimpleBroker
import technology.idlab.intermediate.IRRunner
import technology.idlab.intermediate.IRStage
import technology.idlab.orchestrator.Orchestrator
import technology.idlab.orchestrator.Orchestrator.Status
import technology.idlab.parser.Parser
import technology.idlab.runner.Runner
import technology.idlab.util.Log

/**
* A simple implementation of an orchestrator which only succeeds if all runners succeed without
* intervention.
*/
class SimpleOrchestrator(
/** All stages in the pipeline. */
stages: List<IRStage>,
/** List of all runners. */
runners: List<IRRunner>
/** Configuration of the pipeline. */
parser: Parser
) : Orchestrator {
/** Message broker. */
private val broker: Broker<ByteArray>

/** Runners by URI. */
private val runners: Map<String, Runner>

/** The current status of the runner. */
override var status = Status.CREATED
set(value) {
Log.shared.debug { "Orchestrator status changed to: $value" }
field = value
}
/** All the runners used in the pipeline. */
private val runners: List<Runner>

/** Load all stages into their respective runners. */
init {
this.status = Status.INITIALISING

// Associate the URI of the runner to the runner itself, as well as the set of stages.
val result: Map<String, Pair<IRRunner, MutableSet<IRStage>>> =
runners.associateBy { it.uri }.mapValues { (_, runner) -> Pair(runner, mutableSetOf()) }
val result = mutableListOf<Runner>()

// Add every stage to it's corresponding runner.
for (stage in stages) {
result[stage.processor.target]!!.second.add(stage)
for (runner in parser.runners()) {
val stages = parser.stages(runner)
result.add(Runner.from(runner, stages))
}

// Instantiate the runners.
this.runners =
result.mapValues {
val (runner, runnerStages) = it.value
Runner.from(runner, runnerStages)
}

// Register broker.
broker = SimpleBroker(this.runners.values)
this.runners = result
}

// Ready for execution.
status = Status.READY
/** Initialize a broker. */
init {
this.broker = SimpleBroker(this.runners)
}

/** Execute all stages in all the runtimes. */
override suspend fun exec() = coroutineScope {
status = Status.RUNNING

// Execute all runners in parallel.
val executions = mutableListOf<Deferred<Unit>>()
for ((_, runner) in runners) {
for (runner in runners) {
executions.add(async { runner.exec() })
}

// Wait for all runners to finish.
try {
executions.awaitAll()
} catch (e: Exception) {
status = Status.FAILED
return@coroutineScope
}

status = Status.SUCCESS
executions.awaitAll()
return@coroutineScope
}
}
17 changes: 17 additions & 0 deletions orchestrator/src/main/kotlin/parser/Parser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import technology.idlab.intermediate.IRPackage
import technology.idlab.intermediate.IRPipeline
import technology.idlab.intermediate.IRProcessor
import technology.idlab.intermediate.IRRunner
import technology.idlab.intermediate.IRStage

/**
* Parse an RDF file into an intermediate representation, and validate it against the ontology and
Expand All @@ -25,4 +26,20 @@ interface Parser {

/** List of all known dependencies. */
fun dependencies(): List<IRDependency>

/**
* Get a runner by its URI.
*
* @param id The URI of the runner.
* @return The runner with the given URI.
*/
fun runner(uri: String): IRRunner

/**
* Get all stages for a given runner.
*
* @param runner The runner to get stages for.
* @return The stages for the given runner.
*/
fun stages(runner: IRRunner): List<IRStage>
}
9 changes: 9 additions & 0 deletions orchestrator/src/main/kotlin/parser/impl/jena/JenaParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,13 @@ class JenaParser(
val uris = model.listObjectsOfProperty(RDFC.dependency).toList()
return uris.map { IRDependency(it.toString()) }
}

override fun runner(uri: String): IRRunner {
val resource = model.getResource(uri)
return model.runner(resource)
}

override fun stages(runner: IRRunner): List<IRStage> {
return model.pipelines().flatMap { it.stages }.filter { it.processor.target == runner.uri }
}
}
34 changes: 0 additions & 34 deletions orchestrator/src/test/kotlin/SimpleOrchestratorTest.kt

This file was deleted.

0 comments on commit f179a0f

Please sign in to comment.