diff --git a/core/src/main/scala/akka/kafka/javadsl/Consumer.scala b/core/src/main/scala/akka/kafka/javadsl/Consumer.scala index bac60ddf4..7ed4a3f9a 100644 --- a/core/src/main/scala/akka/kafka/javadsl/Consumer.scala +++ b/core/src/main/scala/akka/kafka/javadsl/Consumer.scala @@ -92,7 +92,12 @@ object Consumer { def drainAndShutdown(ec: Executor): CompletionStage[T] = control.drainAndShutdown(streamCompletion, ec) - override def isShutdown: CompletionStage[Done] = control.isShutdown + override val isShutdown: CompletionStage[Done] = + control.isShutdown.thenCompose[Done] { (_: Any) => // Scala 2.12 needs the types here + streamCompletion.thenApply[Done] { (_: Any) => // Scala 2.12 needs the types here + Done.done() + } + } override def getMetrics: CompletionStage[java.util.Map[MetricName, Metric]] = control.getMetrics } diff --git a/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala b/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala index b03caa098..e0b6df54c 100644 --- a/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala +++ b/core/src/main/scala/akka/kafka/scaladsl/Consumer.scala @@ -117,7 +117,7 @@ object Consumer { */ def drainAndShutdown()(implicit ec: ExecutionContext): Future[T] = control.drainAndShutdown(streamCompletion)(ec) - override def isShutdown: Future[Done] = + override val isShutdown: Future[Done] = control.isShutdown .flatMap(_ => streamCompletion)(ExecutionContexts.parasitic) .map(_ => Done)(ExecutionContexts.parasitic) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala index 4d3aafaa2..00163761f 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/ControlSpec.scala @@ -29,7 +29,7 @@ object ControlSpec { shutdownCalled.set(true) shutdownFuture } - override def isShutdown: Future[Done] = ??? + override def isShutdown: Future[Done] = Future.failed(new NotImplementedError("")) override def metrics: Future[Map[MetricName, Metric]] = ??? } }