Skip to content

Commit

Permalink
feat: grpc handle incoming and outgoing channels
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jun 20, 2024
1 parent 3850d7d commit 75ebef7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
4 changes: 2 additions & 2 deletions proto/index.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import "empty.proto";
import "intermediate.proto";

message ChannelData {
string target_uri = 1;
string data = 2;
string destination_uri = 1;
bytes data = 2;
}

service Runner {
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/runner/Runner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class Runner {
)

/* Messages which are destined to a processor inside the runner. */
private val incoming: Channel<Payload> = Channel()
protected val incoming: Channel<Payload> = Channel()

/* Message which must be transmitted to the outside world. */
protected val outgoing: Channel<Payload> = Channel()
Expand Down
30 changes: 28 additions & 2 deletions src/main/kotlin/runner/impl/GRPCRunner.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package technology.idlab.runner.impl

import EmptyOuterClass.Empty
import Index.ChannelData as GRPCChannelData
import Intermediate as GRPC
import RunnerGrpcKt
import com.google.protobuf.ByteString
import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import runner.Runner
import technology.idlab.parser.intermediate.IRArgument
import technology.idlab.parser.intermediate.IRParameter
Expand Down Expand Up @@ -83,10 +88,31 @@ abstract class GRPCRunner(host: String, port: Int) : Runner() {

/** Create a single stub for all communication. */
private val grpc: RunnerGrpcKt.RunnerCoroutineStub
private val parseIncoming: Flow<GRPCChannelData>
private val parseOutgoing: Flow<Unit>

init {
val builder = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
grpc = RunnerGrpcKt.RunnerCoroutineStub(builder)
// Initialize the GRPC stub.
val connection = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
grpc = RunnerGrpcKt.RunnerCoroutineStub(connection)

// Create a flow for incoming messages.
parseIncoming =
flow<GRPCChannelData> {
for (message in this@GRPCRunner.incoming) {
val builder = GRPCChannelData.newBuilder()
builder.setDestinationUri(message.destinationURI)
builder.setData(ByteString.copyFrom(message.data))
emit(builder.build())
}
}

// Emit outgoing messages.
parseOutgoing =
grpc.channel(parseIncoming).map {
val message = Runner.Payload(it.destinationUri, it.data.toByteArray())
this.outgoing.send(message)
}
}

override suspend fun prepare(processor: IRProcessor) {
Expand Down

0 comments on commit 75ebef7

Please sign in to comment.