diff --git a/examples/pom.xml b/examples/pom.xml index 55a0020..a7e7b35 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -55,6 +55,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.batchmap.flatmap.BatchFlatMap @@ -74,6 +77,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction @@ -93,6 +99,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.mapstream.flatmapstream.FlatMapStreamFunction @@ -110,6 +119,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction @@ -195,6 +207,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.map.forward.ForwardFunction @@ -266,6 +281,9 @@ dockerBuild + + amazoncorretto:11 + io.numaproj.numaflow.examples.source.simple.SimpleSource diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java index 83c276f..bca4d4f 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Server.java @@ -6,10 +6,12 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -20,8 +22,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create sink gRPC server. @@ -39,8 +43,10 @@ public Server(BatchMapper batchMapper) { * @param batchMapper to process the message */ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) { - this.service = new Service(batchMapper); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(batchMapper, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,35 +63,55 @@ public void start() throws Exception { Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "server started, listening on socket path: " + grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server != null && server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -96,7 +122,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("batch map server is waiting for termination"); server.awaitTermination(); + log.info("batch map server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java index b582a9b..fd967cd 100644 --- a/src/main/java/io/numaproj/numaflow/batchmapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/batchmapper/Service.java @@ -35,6 +35,9 @@ class Service extends MapGrpc.MapImplBase { // BatchMapper instance to process the messages private final BatchMapper batchMapper; + // Signal to shut down the gRPC server + private final CompletableFuture shutdownSignal; + // Applies a map function to each datum element in the stream. @Override public StreamObserver mapFn(StreamObserver responseObserver) { @@ -93,8 +96,9 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { datumStream.writeMessage(constructHandlerDatum(mapRequest)); } } catch (Exception e) { - log.error("Encountered an error in batch map", e); - responseObserver.onError(Status.UNKNOWN + log.error("Encountered an error in batch map onNext - {}", e.getMessage()); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); @@ -104,11 +108,12 @@ public void onNext(MapOuterClass.MapRequest mapRequest) { // Called when an error occurs @Override public void onError(Throwable throwable) { - log.error("Error Encountered in batchMap Stream", throwable); - var status = Status.UNKNOWN + log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage()); + shutdownSignal.completeExceptionally(throwable); + responseObserver.onError(Status.INTERNAL .withDescription(throwable.getMessage()) - .withCause(throwable); - responseObserver.onError(status.asException()); + .withCause(throwable) + .asException()); } // Called when the client has finished sending requests diff --git a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java index 6a1649e..3ce663b 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java @@ -46,7 +46,9 @@ class MapSupervisorActor extends AbstractActor { private final Mapper mapper; private final StreamObserver responseObserver; - private final CompletableFuture failureFuture; + private final CompletableFuture shutdownSignal; + private int activeMapperCount; + private Exception userException; public MapSupervisorActor( Mapper mapper, @@ -54,25 +56,28 @@ public MapSupervisorActor( CompletableFuture failureFuture) { this.mapper = mapper; this.responseObserver = responseObserver; - this.failureFuture = failureFuture; + this.shutdownSignal = failureFuture; + this.userException = null; + this.activeMapperCount = 0; } public static Props props( Mapper mapper, StreamObserver responseObserver, - CompletableFuture failureFuture) { - return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture); + CompletableFuture shutdownSignal) { + return Props.create(MapSupervisorActor.class, mapper, responseObserver, shutdownSignal); } @Override public void preRestart(Throwable reason, Optional message) { - log.debug("supervisor pre restart was executed"); - failureFuture.completeExceptionally(reason); - responseObserver.onError(Status.UNKNOWN + getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage()); + shutdownSignal.completeExceptionally(reason); + responseObserver.onError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) .asException()); Service.mapperActorSystem.stop(getSelf()); + shutdownSignal.completeExceptionally(reason); } @Override @@ -93,18 +98,36 @@ public Receive createReceive() { } private void handleFailure(Exception e) { - responseObserver.onError(Status.UNKNOWN - .withDescription(e.getMessage()) - .withCause(e) - .asException()); - failureFuture.completeExceptionally(e); + log.error("Encountered error in mapFn - {}", e.getMessage()); + if (userException == null) { + userException = e; + // only send the very first exception to the client + // one exception should trigger a container restart + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asException()); + } + activeMapperCount--; } private void sendResponse(MapOuterClass.MapResponse mapResponse) { responseObserver.onNext(mapResponse); + activeMapperCount--; } private void processRequest(MapOuterClass.MapRequest mapRequest) { + if (userException != null) { + log.info("a previous mapper actor failed, not processing any more requests"); + if (activeMapperCount == 0) { + log.info("there is no more active mapper AKKA actors - stopping the system"); + getContext().getSystem().stop(getSelf()); + log.info("AKKA system stopped"); + shutdownSignal.completeExceptionally(userException); + } + return; + } + // Create a MapperActor for each incoming request. ActorRef mapperActor = getContext() .actorOf(MapperActor.props( @@ -112,15 +135,16 @@ private void processRequest(MapOuterClass.MapRequest mapRequest) { // Send the message to the MapperActor. mapperActor.tell(mapRequest, getSelf()); + activeMapperCount++; } // if we see dead letters, we need to stop the execution and exit // to make sure no messages are lost private void handleDeadLetters(AllDeadLetters deadLetter) { log.debug("got a dead letter, stopping the execution"); - responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException()); - failureFuture.completeExceptionally(new Throwable("dead letters")); + responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException()); getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(new Throwable("dead letters")); } @Override @@ -129,8 +153,8 @@ public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy( DeciderBuilder .match(Exception.class, e -> { - failureFuture.completeExceptionally(e); - responseObserver.onError(Status.UNKNOWN + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); diff --git a/src/main/java/io/numaproj/numaflow/mapper/Server.java b/src/main/java/io/numaproj/numaflow/mapper/Server.java index dfc1021..b7376d4 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Server.java @@ -5,10 +5,12 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -19,8 +21,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -38,8 +42,10 @@ public Server(Mapper mapper) { * @param mapper to process the message */ public Server(Mapper mapper, GRPCConfig grpcConfig) { - this.service = new Service(mapper); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(mapper, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -61,39 +67,63 @@ public void start() throws Exception { } if (this.server == null) { - ServerBuilder serverBuilder; - // create server builder for domain socket server - serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down mapper gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -104,7 +134,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("mapper server is waiting for termination"); server.awaitTermination(); + log.info("mapper server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/mapper/Service.java b/src/main/java/io/numaproj/numaflow/mapper/Service.java index 150dee5..f8618dc 100644 --- a/src/main/java/io/numaproj/numaflow/mapper/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapper/Service.java @@ -21,18 +21,7 @@ class Service extends MapGrpc.MapImplBase { public static final ActorSystem mapperActorSystem = ActorSystem.create("mapper"); private final Mapper mapper; - - // TODO we need to propagate the exception all the way up and shutdown the server. - static void handleFailure( - CompletableFuture failureFuture) { - new Thread(() -> { - try { - failureFuture.get(); - } catch (Exception e) { - e.printStackTrace(); - } - }).start(); - } + private final CompletableFuture shutdownSignal; @Override public StreamObserver mapFn(final StreamObserver responseObserver) { @@ -43,13 +32,9 @@ public StreamObserver mapFn(final StreamObserver failureFuture = new CompletableFuture<>(); - - handleFailure(failureFuture); - // create a MapSupervisorActor that processes the map requests. ActorRef mapSupervisorActor = mapperActorSystem - .actorOf(MapSupervisorActor.props(mapper, responseObserver, failureFuture)); + .actorOf(MapSupervisorActor.props(mapper, responseObserver, shutdownSignal)); return new StreamObserver<>() { private boolean handshakeDone = false; diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java index d3d576c..543a570 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Server.java @@ -6,10 +6,12 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; import java.util.Collections; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -20,8 +22,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create sink gRPC server. @@ -39,8 +43,10 @@ public Server(MapStreamer mapStreamer) { * @param mapStreamer to process the message */ public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) { - this.service = new Service(mapStreamer); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(mapStreamer, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,35 +63,55 @@ public void start() throws Exception { Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE)); if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on socket path: " + grpcConfig.getSocketPath()); + "server started, listening on socket path: " + grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); + System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down map streamer gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -96,7 +122,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("map stream server is waiting for termination"); server.awaitTermination(); + log.info("map stream server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java index 44d70eb..a096143 100644 --- a/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java +++ b/src/main/java/io/numaproj/numaflow/mapstreamer/Service.java @@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import static io.numaproj.numaflow.map.v1.MapGrpc.getMapFnMethod; @@ -17,6 +18,7 @@ class Service extends MapGrpc.MapImplBase { private final MapStreamer mapStreamer; + private final CompletableFuture shutdownSignal; @Override public StreamObserver mapFn(StreamObserver responseObserver) { @@ -57,9 +59,11 @@ public void onNext(MapOuterClass.MapRequest request) { constructHandlerDatum(request), new OutputObserverImpl(responseObserver)); } catch (Exception e) { - log.error("Error processing message", e); - responseObserver.onError(Status.UNKNOWN + log.error("Encountered error in mapFn onNext - {}", e.getMessage()); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) + .withCause(e) .asException()); return; } @@ -76,11 +80,12 @@ public void onNext(MapOuterClass.MapRequest request) { @Override public void onError(Throwable throwable) { - log.error("Error Encountered in mapStream Stream", throwable); - var status = Status.UNKNOWN + log.error("Encountered error in mapStream Stream - {}", throwable.getMessage()); + shutdownSignal.completeExceptionally(throwable); + responseObserver.onError(Status.INTERNAL .withDescription(throwable.getMessage()) - .withCause(throwable); - responseObserver.onError(status.asException()); + .withCause(throwable) + .asException()); } @Override diff --git a/src/main/java/io/numaproj/numaflow/reducer/Server.java b/src/main/java/io/numaproj/numaflow/reducer/Server.java index 92295c6..478be13 100644 --- a/src/main/java/io/numaproj/numaflow/reducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducer/Server.java @@ -6,6 +6,7 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; @@ -21,6 +22,7 @@ public class Server { private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -40,6 +42,7 @@ public Server(ReducerFactory reducerFactory) { public Server(ReducerFactory reducerFactory, GRPCConfig grpcConfig) { this.service = new Service(reducerFactory); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,33 +60,32 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -99,7 +101,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("reducer server is waiting for termination"); server.awaitTermination(); + log.info("reducer server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java index bb1c2d7..941292d 100644 --- a/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java +++ b/src/main/java/io/numaproj/numaflow/reducestreamer/Server.java @@ -8,6 +8,7 @@ import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.reducestreamer.model.ReduceStreamer; import io.numaproj.numaflow.reducestreamer.model.ReduceStreamerFactory; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; @@ -22,6 +23,7 @@ public class Server { private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -43,6 +45,7 @@ public Server( GRPCConfig grpcConfig) { this.service = new Service(reduceStreamerFactory); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -60,33 +63,32 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -102,7 +104,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("reduce stream server is waiting for termination"); server.awaitTermination(); + log.info("reduce stream server terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java index 7509a5a..4c268b9 100644 --- a/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sessionreducer/Server.java @@ -8,6 +8,7 @@ import io.numaproj.numaflow.info.ServerInfoAccessorImpl; import io.numaproj.numaflow.sessionreducer.model.SessionReducer; import io.numaproj.numaflow.sessionreducer.model.SessionReducerFactory; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; @@ -22,6 +23,7 @@ public class Server { private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -43,6 +45,7 @@ public Server( GRPCConfig grpcConfig) { this.service = new Service(sessionReducerFactory); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -60,33 +63,32 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -102,7 +104,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("session reduce server is waiting for termination"); server.awaitTermination(); + log.info("session reduce server terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java new file mode 100644 index 0000000..9fe59cd --- /dev/null +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerHelper.java @@ -0,0 +1,110 @@ +package io.numaproj.numaflow.shared; + +import io.grpc.BindableService; +import io.grpc.Context; +import io.grpc.Contexts; +import io.grpc.ForwardingServerCallListener; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.netty.NettyServerBuilder; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.unix.DomainSocketAddress; + +import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END; +import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START; +import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_END_TIME; +import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_START_TIME; + +public class GrpcServerHelper { + private EventLoopGroup bossEventLoopGroup; + private EventLoopGroup workerEventLoopGroup; + + public void gracefullyShutdownEventLoopGroups() { + if (this.bossEventLoopGroup != null) { + this.bossEventLoopGroup.shutdownGracefully(); + } + if (this.workerEventLoopGroup != null) { + this.workerEventLoopGroup.shutdownGracefully(); + } + } + + public Server createServer( + String socketPath, + int maxMessageSize, + boolean isLocal, + int port, + BindableService service) { + ServerInterceptor interceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, + io.grpc.Metadata headers, + ServerCallHandler next) { + + final var context = + Context.current().withValues( + WINDOW_START_TIME, + headers.get(DATUM_METADATA_WIN_START), + WINDOW_END_TIME, + headers.get(DATUM_METADATA_WIN_END)); + ServerCall.Listener listener = Contexts.interceptCall( + context, + call, + headers, + next); + return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( + listener) { + @Override + public void onHalfClose() { + try { + super.onHalfClose(); + } catch (RuntimeException ex) { + handleException(ex, call, headers); + throw ex; + } + } + + private void handleException( + RuntimeException e, + ServerCall serverCall, + io.grpc.Metadata headers) { + // Currently, we only have application level exceptions. + // Translate it to UNKNOWN status. + var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); + var newStatus = Status.fromThrowable(status.asException()); + serverCall.close(newStatus, headers); + e.printStackTrace(); + System.exit(1); + } + }; + } + }; + + if (isLocal) { + return ServerBuilder.forPort(port) + .maxInboundMessageSize(maxMessageSize) + .intercept(interceptor) + .addService(service) + .build(); + } + + this.bossEventLoopGroup = GrpcServerUtils.createEventLoopGroup(1, "netty-boss"); + this.workerEventLoopGroup = GrpcServerUtils.createEventLoopGroup( + ThreadUtils.INSTANCE.availableProcessors(), + "netty-worker"); + + return NettyServerBuilder + .forAddress(new DomainSocketAddress(socketPath)) + .channelType(GrpcServerUtils.getChannelTypeClass()) + .maxInboundMessageSize(maxMessageSize) + .bossEventLoopGroup(this.bossEventLoopGroup) + .workerEventLoopGroup(this.workerEventLoopGroup) + .intercept(interceptor) + .addService(service) + .build(); + } +} diff --git a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java index 0ba9c9c..9f90c73 100644 --- a/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java +++ b/src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java @@ -127,72 +127,4 @@ public static void writeServerInfo( log.info("Writing server info {} to {}", serverInfo, infoFilePath); serverInfoAccessor.write(serverInfo, infoFilePath); } - - public static ServerBuilder createServerBuilder( - String socketPath, - int maxMessageSize, - boolean isLocal, - int port) { - ServerInterceptor interceptor = new ServerInterceptor() { - @Override - public ServerCall.Listener interceptCall( - ServerCall call, - io.grpc.Metadata headers, - ServerCallHandler next) { - - final var context = - Context.current().withValues( - WINDOW_START_TIME, - headers.get(DATUM_METADATA_WIN_START), - WINDOW_END_TIME, - headers.get(DATUM_METADATA_WIN_END)); - ServerCall.Listener listener = Contexts.interceptCall( - context, - call, - headers, - next); - return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>( - listener) { - @Override - public void onHalfClose() { - try { - super.onHalfClose(); - } catch (RuntimeException ex) { - handleException(ex, call, headers); - throw ex; - } - } - - private void handleException( - RuntimeException e, - ServerCall serverCall, - io.grpc.Metadata headers) { - // Currently, we only have application level exceptions. - // Translate it to UNKNOWN status. - var status = Status.UNKNOWN.withDescription(e.getMessage()).withCause(e); - var newStatus = Status.fromThrowable(status.asException()); - serverCall.close(newStatus, headers); - e.printStackTrace(); - System.exit(1); - } - }; - } - }; - - if (isLocal) { - return ServerBuilder.forPort(port) - .maxInboundMessageSize(maxMessageSize) - .intercept(interceptor); - } - - return NettyServerBuilder - .forAddress(new DomainSocketAddress(socketPath)) - .channelType(GrpcServerUtils.getChannelTypeClass()) - .maxInboundMessageSize(maxMessageSize) - .bossEventLoopGroup(GrpcServerUtils.createEventLoopGroup(1, "netty-boss")) - .workerEventLoopGroup(GrpcServerUtils.createEventLoopGroup( - ThreadUtils.INSTANCE.availableProcessors(), - "netty-worker")) - .intercept(interceptor); - } } diff --git a/src/main/java/io/numaproj/numaflow/sideinput/Server.java b/src/main/java/io/numaproj/numaflow/sideinput/Server.java index d0c7af0..1407d6b 100644 --- a/src/main/java/io/numaproj/numaflow/sideinput/Server.java +++ b/src/main/java/io/numaproj/numaflow/sideinput/Server.java @@ -6,6 +6,7 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; @@ -21,6 +22,7 @@ public class Server { private final Service service; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -40,6 +42,7 @@ public Server(SideInputRetriever sideInputRetriever) { public Server(SideInputRetriever sideInputRetriever, GRPCConfig grpcConfig) { this.service = new Service(sideInputRetriever); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,33 +60,32 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); @@ -99,7 +101,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("side input server is waiting for termination"); server.awaitTermination(); + log.info("side input server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sinker/Server.java b/src/main/java/io/numaproj/numaflow/sinker/Server.java index 53dbb1a..65c8d7d 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Server.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Server.java @@ -5,9 +5,11 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -18,8 +20,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create sink gRPC server. @@ -37,8 +41,10 @@ public Server(Sinker sinker) { * @param sinker sink to process the message */ public Server(Sinker sinker, GRPCConfig grpcConfig) { - this.service = new Service(sinker); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(sinker, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -56,38 +62,57 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); + System.err.println("*** shutting down sink gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -98,7 +123,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("sink server is waiting for termination"); server.awaitTermination(); + log.info("sink server is terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sinker/Service.java b/src/main/java/io/numaproj/numaflow/sinker/Service.java index 770562e..1cd5201 100644 --- a/src/main/java/io/numaproj/numaflow/sinker/Service.java +++ b/src/main/java/io/numaproj/numaflow/sinker/Service.java @@ -5,6 +5,7 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.sink.v1.SinkGrpc; import io.numaproj.numaflow.sink.v1.SinkOuterClass; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.time.Instant; @@ -15,6 +16,7 @@ import java.util.concurrent.TimeUnit; @Slf4j +@AllArgsConstructor class Service extends SinkGrpc.SinkImplBase { // sinkTaskExecutor is the executor for the sinker. It is a fixed size thread pool // with the number of threads equal to the number of cores on the machine times 2. @@ -23,10 +25,7 @@ class Service extends SinkGrpc.SinkImplBase { .newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); private final Sinker sinker; - - public Service(Sinker sinker) { - this.sinker = sinker; - } + private final CompletableFuture shutdownSignal; /** * Applies a function to each datum element in the stream. @@ -96,15 +95,20 @@ public void onNext(SinkOuterClass.SinkRequest request) { datumStream.writeMessage(constructHandlerDatum(request)); } } catch (Exception e) { - log.error("Encountered error in sinkFn - {}", e.getMessage()); - responseObserver.onError(e); + log.error("Encountered error in sinkFn onNext - {}", e.getMessage()); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); } } @Override public void onError(Throwable throwable) { log.error("Encountered error in sinkFn - {}", throwable.getMessage()); - responseObserver.onError(throwable); + shutdownSignal.completeExceptionally(throwable); + responseObserver.onError(Status.INTERNAL + .withDescription(throwable.getMessage()) + .withCause(throwable) + .asException()); } @Override diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Server.java b/src/main/java/io/numaproj/numaflow/sourcer/Server.java index d4600a0..a91eb5f 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Server.java @@ -6,9 +6,11 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -19,8 +21,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -38,8 +42,10 @@ public Server(Sourcer sourcer) { * @param sourcer Sourcer interface */ public Server(Sourcer sourcer, GRPCConfig grpcConfig) { - this.service = new Service(sourcer); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(sourcer, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,38 +63,57 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println("*** shutting down gRPC server since JVM is shutting down"); + System.err.println("*** shutting down source gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down source gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -99,7 +124,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("waiting for server to terminate"); server.awaitTermination(); + log.info("server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sourcer/Service.java b/src/main/java/io/numaproj/numaflow/sourcer/Service.java index 1714d36..c65d3c6 100644 --- a/src/main/java/io/numaproj/numaflow/sourcer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcer/Service.java @@ -5,10 +5,13 @@ import io.grpc.stub.StreamObserver; import io.numaproj.numaflow.source.v1.SourceGrpc; import io.numaproj.numaflow.source.v1.SourceOuterClass; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod; @@ -16,12 +19,11 @@ /** * Implementation of the gRPC service for the sourcer. */ +@Slf4j +@AllArgsConstructor class Service extends SourceGrpc.SourceImplBase { private final Sourcer sourcer; - - public Service(Sourcer sourcer) { - this.sourcer = sourcer; - } + private final CompletableFuture shutdownSignal; /** * readFn is the endpoint for reading data from the sourcer. @@ -50,35 +52,47 @@ public void onNext(SourceOuterClass.ReadRequest request) { return; } - ReadRequestImpl readRequest = new ReadRequestImpl( - request.getRequest().getNumRecords(), - Duration.ofMillis(request.getRequest().getTimeoutInMs())); - - // Create an observer to write the response back to the client - OutputObserverImpl outputObserver = new OutputObserverImpl(responseObserver); - - // invoke the sourcer's read method - sourcer.read(readRequest, outputObserver); - - // once the read is done, send an EOT message to indicate the client - // that the end of batch has been reached - SourceOuterClass.ReadResponse.Status status = SourceOuterClass.ReadResponse.Status - .newBuilder() - .setEot(true) - .setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS) - .build(); - - SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse.newBuilder() - .setStatus(status) - .build(); - - responseObserver.onNext(response); + try { + ReadRequestImpl readRequest = new ReadRequestImpl( + request.getRequest().getNumRecords(), + Duration.ofMillis(request.getRequest().getTimeoutInMs())); + + // Create an observer to write the response back to the client + OutputObserverImpl outputObserver = new OutputObserverImpl(responseObserver); + + // invoke the sourcer's read method + sourcer.read(readRequest, outputObserver); + + // once the read is done, send an EOT message to indicate the client + // that the end of batch has been reached + SourceOuterClass.ReadResponse.Status status = SourceOuterClass.ReadResponse.Status + .newBuilder() + .setEot(true) + .setCode(SourceOuterClass.ReadResponse.Status.Code.SUCCESS) + .build(); + + SourceOuterClass.ReadResponse response = SourceOuterClass.ReadResponse.newBuilder() + .setStatus(status) + .build(); + + responseObserver.onNext(response); + } catch (Exception e) { + log.error("Encountered error in readFn onNext - {}", e.getMessage()); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asException()); + } } @Override public void onError(Throwable t) { - responseObserver.onError(Status.UNKNOWN + log.error("Encountered error in readFn onNext - {}", t.getMessage()); + shutdownSignal.completeExceptionally(t); + responseObserver.onError(Status.INTERNAL .withDescription(t.getMessage()) + .withCause(t) .asException()); } @@ -116,31 +130,45 @@ public void onNext(SourceOuterClass.AckRequest request) { return; } - List offsets = new ArrayList<>(request.getRequest().getOffsetsCount()); - for (SourceOuterClass.Offset offset : request.getRequest().getOffsetsList()) { - offsets.add(new Offset( - offset.getOffset().toByteArray(), - offset.getPartitionId())); - } - - AckRequestImpl ackRequest = new AckRequestImpl(offsets); - - // invoke the sourcer's ack method - sourcer.ack(ackRequest); - - // send an ack response to the client after acking the message - SourceOuterClass.AckResponse response = SourceOuterClass.AckResponse - .newBuilder() - .setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess( - Empty.newBuilder().build())) - .build(); + try { + List offsets = new ArrayList<>(request.getRequest().getOffsetsCount()); + for (SourceOuterClass.Offset offset : request.getRequest().getOffsetsList()) { + offsets.add(new Offset( + offset.getOffset().toByteArray(), + offset.getPartitionId())); + } - responseObserver.onNext(response); + AckRequestImpl ackRequest = new AckRequestImpl(offsets); + + // invoke the sourcer's ack method + sourcer.ack(ackRequest); + + // send an ack response to the client after acking the message + SourceOuterClass.AckResponse response = SourceOuterClass.AckResponse + .newBuilder() + .setResult(SourceOuterClass.AckResponse.Result.newBuilder().setSuccess( + Empty.newBuilder().build())) + .build(); + + responseObserver.onNext(response); + } catch (Exception e) { + log.error("Encountered error in ackFn onNext - {}", e.getMessage()); + shutdownSignal.completeExceptionally(e); + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asException()); + } } @Override public void onError(Throwable t) { - responseObserver.onError(t); + log.error("Encountered error in ackFn onNext - {}", t.getMessage()); + shutdownSignal.completeExceptionally(t); + responseObserver.onError(Status.INTERNAL + .withDescription(t.getMessage()) + .withCause(t) + .asException()); } @Override diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java index 2e5e85f..fcf42bc 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Server.java @@ -6,9 +6,11 @@ import io.numaproj.numaflow.info.ContainerType; import io.numaproj.numaflow.info.ServerInfoAccessor; import io.numaproj.numaflow.info.ServerInfoAccessorImpl; +import io.numaproj.numaflow.shared.GrpcServerHelper; import io.numaproj.numaflow.shared.GrpcServerUtils; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; /** @@ -19,8 +21,10 @@ public class Server { private final GRPCConfig grpcConfig; private final Service service; + private final CompletableFuture shutdownSignal; private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper()); private io.grpc.Server server; + private final GrpcServerHelper grpcServerHelper; /** * constructor to create gRPC server. @@ -38,8 +42,10 @@ public Server(SourceTransformer sourceTransformer) { * @param sourceTransformer to transform the message */ public Server(SourceTransformer sourceTransformer, GRPCConfig grpcConfig) { - this.service = new Service(sourceTransformer); + this.shutdownSignal = new CompletableFuture<>(); + this.service = new Service(sourceTransformer, this.shutdownSignal); this.grpcConfig = grpcConfig; + this.grpcServerHelper = new GrpcServerHelper(); } /** @@ -57,38 +63,63 @@ public void start() throws Exception { } if (this.server == null) { - // create server builder - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( + this.server = this.grpcServerHelper.createServer( grpcConfig.getSocketPath(), grpcConfig.getMaxMessageSize(), grpcConfig.isLocal(), - grpcConfig.getPort()); - - // build server - this.server = serverBuilder - .addService(this.service) - .build(); + grpcConfig.getPort(), + this.service); } - // start server server.start(); log.info( - "Server started, listening on {}", + "server started, listening on {}", grpcConfig.isLocal() ? "localhost:" + grpcConfig.getPort() : grpcConfig.getSocketPath()); - // register shutdown hook + // register shutdown hook to gracefully shut down the server Runtime.getRuntime().addShutdownHook(new Thread(() -> { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); + if (server != null && server.isTerminated()) { + return; + } try { Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); } catch (InterruptedException e) { Thread.interrupted(); e.printStackTrace(System.err); } })); + + // if there are any exceptions, shutdown the server gracefully. + shutdownSignal.whenCompleteAsync((v, e) -> { + if (server.isTerminated()) { + return; + } + + if (e != null) { + System.err.println("*** shutting down sink gRPC server because of an exception - " + e.getMessage()); + try { + log.info("stopping server"); + Server.this.stop(); + log.info("gracefully shutting down event loop groups"); + this.grpcServerHelper.gracefullyShutdownEventLoopGroups(); + // FIXME - this is a workaround to immediately terminate the JVM process + // The correct way to do this is to stop all the actors and wait for them to terminate + System.exit(0); + } catch (InterruptedException ex) { + Thread.interrupted(); + ex.printStackTrace(System.err); + } + } + }); } /** @@ -99,7 +130,9 @@ public void start() throws Exception { * @throws InterruptedException if the current thread is interrupted while waiting */ public void awaitTermination() throws InterruptedException { + log.info("transformer server is waiting for termination"); server.awaitTermination(); + log.info("transformer server has terminated"); } /** diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java index f1824cf..a2c9985 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/Service.java @@ -21,18 +21,7 @@ class Service extends SourceTransformGrpc.SourceTransformImplBase { public static final ActorSystem transformerActorSystem = ActorSystem.create("transformer"); private final SourceTransformer transformer; - - // TODO we need to propagate the exception all the way up and shutdown the server. - static void handleFailure( - CompletableFuture failureFuture) { - new Thread(() -> { - try { - failureFuture.get(); - } catch (Exception e) { - e.printStackTrace(); - } - }).start(); - } + private final CompletableFuture shutdownSignal; @Override public StreamObserver sourceTransformFn(final StreamObserver responseObserver) { @@ -43,16 +32,12 @@ public StreamObserver sourceTransformF responseObserver); } - CompletableFuture failureFuture = new CompletableFuture<>(); - - handleFailure(failureFuture); - // create a TransformSupervisorActor that processes the transform requests. ActorRef transformSupervisorActor = transformerActorSystem .actorOf(TransformSupervisorActor.props( transformer, responseObserver, - failureFuture)); + shutdownSignal)); return new StreamObserver<>() { private boolean handshakeDone = false; diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java index b2d8c2b..ec37b4e 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformSupervisorActor.java @@ -46,22 +46,26 @@ class TransformSupervisorActor extends AbstractActor { private final SourceTransformer transformer; private final StreamObserver responseObserver; - private final CompletableFuture failureFuture; + private final CompletableFuture shutdownSignal; + private int activeTransformersCount; + private Exception userException; /** * Constructor for TransformSupervisorActor. * * @param transformer The transformer to be used for processing the request. * @param responseObserver The StreamObserver to be used for sending the responses. - * @param failureFuture The CompletableFuture to be completed exceptionally in case of any failure. + * @param shutdownSignal The CompletableFuture to be completed exceptionally in case of any failure. */ public TransformSupervisorActor( SourceTransformer transformer, StreamObserver responseObserver, - CompletableFuture failureFuture) { + CompletableFuture shutdownSignal) { this.transformer = transformer; this.responseObserver = responseObserver; - this.failureFuture = failureFuture; + this.shutdownSignal = shutdownSignal; + this.userException = null; + this.activeTransformersCount = 0; } /** @@ -69,19 +73,19 @@ public TransformSupervisorActor( * * @param transformer The transformer to be used for processing the request. * @param responseObserver The StreamObserver to be used for sending the responses. - * @param failureFuture The CompletableFuture to be completed exceptionally in case of any failure. + * @param shutdownSignal The CompletableFuture to be completed exceptionally in case of any failure. * * @return a Props for creating a TransformSupervisorActor. */ public static Props props( SourceTransformer transformer, StreamObserver responseObserver, - CompletableFuture failureFuture) { + CompletableFuture shutdownSignal) { return Props.create( TransformSupervisorActor.class, transformer, responseObserver, - failureFuture); + shutdownSignal); } /** @@ -92,13 +96,13 @@ public static Props props( */ @Override public void preRestart(Throwable reason, Optional message) { - log.debug("supervisor pre restart was executed"); - failureFuture.completeExceptionally(reason); - responseObserver.onError(Status.UNKNOWN + getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage()); + responseObserver.onError(Status.INTERNAL .withDescription(reason.getMessage()) .withCause(reason) .asException()); Service.transformerActorSystem.stop(getSelf()); + shutdownSignal.completeExceptionally(reason); } /** @@ -132,11 +136,17 @@ public Receive createReceive() { * @param e The exception to be handled. */ private void handleFailure(Exception e) { - responseObserver.onError(Status.UNKNOWN - .withDescription(e.getMessage()) - .withCause(e) - .asException()); - failureFuture.completeExceptionally(e); + log.error("Encountered error in sourceTransformFn - {}", e.getMessage()); + if (userException == null) { + userException = e; + // only send the very first exception to the client + // one exception should trigger a container restart + responseObserver.onError(Status.INTERNAL + .withDescription(e.getMessage()) + .withCause(e) + .asException()); + } + activeTransformersCount--; } /** @@ -146,6 +156,7 @@ private void handleFailure(Exception e) { */ private void sendResponse(Sourcetransformer.SourceTransformResponse transformResponse) { responseObserver.onNext(transformResponse); + activeTransformersCount--; } /** @@ -154,6 +165,16 @@ private void sendResponse(Sourcetransformer.SourceTransformResponse transformRes * @param transformRequest The SourceTransformRequest to be processed. */ private void processRequest(Sourcetransformer.SourceTransformRequest transformRequest) { + if (userException != null) { + log.info("a previous transformer actor failed, not processing any more requests"); + if (activeTransformersCount == 0) { + log.info("there is no more active transformer AKKA actors - stopping the system"); + getContext().getSystem().stop(getSelf()); + log.info("AKKA system stopped"); + shutdownSignal.completeExceptionally(userException); + } + return; + } // Create a TransformerActor for each incoming request. ActorRef transformerActor = getContext() .actorOf(TransformerActor.props( @@ -161,6 +182,7 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe // Send the message to the TransformerActor. transformerActor.tell(transformRequest, getSelf()); + activeTransformersCount++; } /** @@ -170,9 +192,9 @@ private void processRequest(Sourcetransformer.SourceTransformRequest transformRe */ private void handleDeadLetters(AllDeadLetters deadLetter) { log.debug("got a dead letter, stopping the execution"); - responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException()); - failureFuture.completeExceptionally(new Throwable("dead letters")); + responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException()); getContext().getSystem().stop(getSelf()); + shutdownSignal.completeExceptionally(new Throwable("dead letters")); } /** @@ -186,8 +208,7 @@ public SupervisorStrategy supervisorStrategy() { return new AllForOneStrategy( DeciderBuilder .match(Exception.class, e -> { - failureFuture.completeExceptionally(e); - responseObserver.onError(Status.UNKNOWN + responseObserver.onError(Status.INTERNAL .withDescription(e.getMessage()) .withCause(e) .asException()); diff --git a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java index 2863e6a..05a1f22 100644 --- a/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java +++ b/src/main/java/io/numaproj/numaflow/sourcetransformer/TransformerActor.java @@ -114,6 +114,4 @@ private Sourcetransformer.SourceTransformResponse buildResponse( }); return responseBuilder.setId(ID).build(); } - - } diff --git a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java index ebb93bf..3ba1fd1 100644 --- a/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/batchmapper/ServerErrTest.java @@ -135,7 +135,7 @@ public void testErrorFromUDF() { fail("Expected exception not thrown"); } catch (InterruptedException | ExecutionException e) { assertEquals( - "UNKNOWN: java.lang.RuntimeException: unknown exception", + "INTERNAL: java.lang.RuntimeException: unknown exception", e.getCause().getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java index db08a22..ec398c2 100644 --- a/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapper/ServerErrTest.java @@ -129,7 +129,7 @@ public void testMapperFailure() { fail("Expected exception not thrown"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: UNKNOWN: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", e.getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java index afca6e0..9559e2a 100644 --- a/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/mapstreamer/ServerErrTest.java @@ -128,7 +128,7 @@ public void TestMapStreamerErr() { fail("Should have thrown an exception"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: UNKNOWN: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", e.getMessage()); } } diff --git a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java index a4ac662..8286713 100644 --- a/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java +++ b/src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java @@ -36,16 +36,6 @@ public void testWriteServerInfo() throws Exception { .write(Mockito.any(), Mockito.eq("infoFilePath")); } - @Test - public void testCreateServerBuilder() { - ServerBuilder serverBuilder = GrpcServerUtils.createServerBuilder( - "socketPath", - 1000, - false, - 50051); - Assert.assertNotNull(serverBuilder); - } - @Test public void testWindowStartTime() { Context.Key windowStartTime = GrpcServerUtils.WINDOW_START_TIME; diff --git a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java index 1c39766..cc8a44d 100644 --- a/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java +++ b/src/test/java/io/numaproj/numaflow/sourcetransformer/ServerErrTest.java @@ -138,7 +138,7 @@ public void testSourceTransformerFailure() { fail("Expected exception not thrown"); } catch (Exception e) { assertEquals( - "io.grpc.StatusRuntimeException: UNKNOWN: unknown exception", + "io.grpc.StatusRuntimeException: INTERNAL: unknown exception", e.getMessage()); } }