-
Notifications
You must be signed in to change notification settings - Fork 440
akka-sample-kafka-to-sharding-scala doesn't work as expected #219
Comments
Thanks for the report! This is a problem with the combination of alpakka-kafka and the actorcontext protection feature added in Akka 2.6.6 (akka/akka#27112) This particular problem has been fixed in alpakka-kafka in https://github.com/akka/alpakka-kafka/pull/1138/files, and a similar problem in the example has been fixed in #218. For now the workaround is to downgrade to Akka 2.6.5 - but we hope to release alpakka-kafka 2.0.4 soon, which will also fix the issue. |
Thank you for prompt response! Actually downgrading to Akka 2.6.5 doesn't work - cluster doesn't start. As working workaround could confirm just commenting out all ctx.log.info("...") calls in processor module. It saves complete use case scenario in working state. |
I can confirm the sample does not work. I have just checked out the project and ran all steps according to the README.md Versions used: val AkkaVersion = "2.6.8" Tested with JDKs: openjdk version "11.0.7" 2020-04-14 LTS java version "1.8.0_261" sbt "kafka / run"
sbt "processor / run 2551 8551 8081"
sbt "producer / run"
After running the producer, I get a weird error on the Processor :
It stops and does nothing. I can also confirm that the workaround of downgrading Akka to 2.6.5 produces the following error, preventing the cluster to start:
Downgrading Akka to 2.6.6 produces the same behaviour of 2.6.8 |
Versions used
val AkkaVersion = "2.6.6"
val AlpakkaKafkaVersion = "2.0.3"
val AkkaManagementVersion = "1.0.5"
val AkkaHttpVersion = "10.1.11"
val KafkaVersion = "2.4.0"
val LogbackVersion = "1.2.3"
Expected Behavior
[info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [1] assigned to current node. Updating shard allocation
[info] [2020-01-16 09:48:51,040] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [25] assigned to current node. Updating shard allocation
[info] [2020-01-16 09:48:51,043] [INFO] [akka://KafkaToSharding/user/kafka-event-processor/rebalancerRef] - Partition [116] assigned to current node. Updating shard allocation
After producer started, in the single processor node the messages should be:
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 29->45
[info] [2020-01-16 09:51:38,672] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 29 to cluster sharding
[info] [2020-01-16 09:51:38,673] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-26] [akka://KafkaToSharding/system/sharding/user-processing/75/29] - user 29 purchase cat t-shirt, quantity 0, price 8874
[info] [2020-01-16 09:51:39,702] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 60->111
[info] [2020-01-16 09:51:39,703] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 60 to cluster sharding
[info] [2020-01-16 09:51:39,706] [INFO] [sample.sharding.kafka.UserEvents$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/system/sharding/user-processing/2/60] - user 60 purchase cat t-shirt, quantity 2, price 9375
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - entityId->partition 75->1
[info] [2020-01-16 09:51:40,732] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-17] [akka://KafkaToSharding/user/kafka-event-processor] - Forwarding message for entity 75 to cluster sharding
Actual Behavior
[info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Relevant logs
~/ScalaProjects/akka-sample-kafka-to-sharding-scala $ sbt "processor / run 2551 8551 8081"
[info] Loading global plugins from /home/eugene/.sbt/1.0/plugins
[info] Loading settings for project akka-sample-kafka-to-sharding-scala-build from plugins.sbt ...
[info] Loading project definition from /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/project
[info] Loading settings for project akka-sample-kafka-to-sharding from build.sbt ...
[info] Set current project to akka-sample-kafka-to-sharding (in build file:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/)
[info] Compiling 2 protobuf files to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main,/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto
[info] Compiling schema /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto
protoc-jar: protoc version: 3.7.1, detected platform: linux-x86_64 (linux/amd64)
protoc-jar: embedded: bin/3.7.1/protoc-3.7.1-linux-x86_64.exe
protoc-jar: executing: [/tmp/protocjar5741868647544472609/bin/protoc.exe, --plugin=protoc-gen-scala_0=/tmp/protocbridge4871978644069516449, --plugin=protoc-gen-akka-grpc-scaladsl-trait_1=/tmp/protocbridge4453544094864461486, --plugin=protoc-gen-akka-grpc-scaladsl-client_2=/tmp/protocbridge2444707610667080398, --plugin=protoc-gen-akka-grpc-scaladsl-server_3=/tmp/protocbridge1023303769398452281, --scala_0_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-trait_1_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-client_2_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, --akka-grpc-scaladsl-server_3_out=flat_package:/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto, -I/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/users.proto, /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/protobuf/user-events.proto]le / protocGenerate 0s
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/src/main/proto: warning: directory does not exist.
/home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/protobuf_external: warning: directory does not exist.
[info] Generating Akka gRPC service interface for sample.sharding.kafka.UserService
[info] Generating Akka gRPC client for sample.sharding.kafka.UserService
[info] Generating Akka gRPC service handler for sample.sharding.kafka.UserService
[info] Compiling protobuf
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Protoc target directory: /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main
[info] Compiling 15 Scala sources to /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/classes ...
[warn] /home/eugene/ScalaProjects/akka-sample-kafka-to-sharding-scala/processor/target/scala-2.13/src_managed/main/sample/sharding/kafka/UserServiceClient.scala:36:91: class ActorMaterializer in package stream is deprecated (since 2.6.0): The Materializer now has all methods the ActorMaterializer used to have
[warn] private val clientState = new ClientState(settings, akka.event.Logging(mat.asInstanceOf[ActorMaterializer].system, this.getClass))
[warn] ^
[warn] one warning found
[info] running (fork) sample.sharding.kafka.Main 2551 8551 8081
[info] [2020-07-08 18:55:32,397] [INFO] [akka.event.slf4j.Slf4jLogger] [KafkaToSharding-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
[error] SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
[error] SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
[error] SLF4J: See also http://www.slf4j.org/codes.html#replay
[info] [2020-07-08 18:55:32,610] [INFO] [akka.remote.artery.tcp.ArteryTcpTransport] [KafkaToSharding-akka.actor.default-dispatcher-3] [ArteryTcpTransport(akka://KafkaToSharding)] - Remoting started with transport [Artery tcp]; listening on address [akka://KafkaToSharding@127.0.0.1:2551] with UID [1857486658713966714]
[info] [2020-07-08 18:55:32,629] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Starting up, Akka version [2.6.6] ...
[info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[info] [2020-07-08 18:55:32,739] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-3] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Started up successfully
[info] [2020-07-08 18:55:32,786] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - SBR started. Config: stableAfter: 20000 ms, strategy: KeepMajority, selfUniqueAddress: UniqueAddress(akka://KafkaToSharding@127.0.0.1:2551,1857486658713966714), selfDc: default
[info] [2020-07-08 18:55:33,314] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading readiness checks List(NamedHealthCheck(cluster-membership,akka.management.cluster.scaladsl.ClusterMembershipCheck))
[info] [2020-07-08 18:55:33,315] [INFO] [akka.management.internal.HealthChecksImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [HealthChecksImpl(akka://KafkaToSharding)] - Loading liveness checks List()
[info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], control stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-07-08 18:55:33,401] [WARN] [akka.stream.Materializer] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.stream.Log(akka://KafkaToSharding/system/Materializers/StreamSupervisor-1)] - [outbound connection to [akka://KafkaToSharding@127.0.0.1:2552], message stream] Upstream failed, cause: StreamTcpException: Tcp command [Connect(127.0.0.1:2552,None,List(),Some(5000 milliseconds),true)] failed because of java.net.ConnectException: Connection refused
[info] [2020-07-08 18:55:33,416] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Binding Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-07-08 18:55:33,479] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for ClusterHttpManagementRouteProvider
[info] [2020-07-08 18:55:33,521] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-14] [AkkaManagement(akka://KafkaToSharding)] - Including HTTP management routes for HealthCheckRoutes
[info] [2020-07-08 18:55:34,053] [INFO] [akka.management.scaladsl.AkkaManagement] [KafkaToSharding-akka.actor.default-dispatcher-21] [AkkaManagement(akka://KafkaToSharding)] - Bound Akka Management (HTTP) endpoint to: 127.0.0.1:8551
[info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.ActorSystemImpl] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.actor.ActorSystemImpl(KafkaToSharding)] - Retrieved 128 partitions for topic 'user-events'
[info] [2020-07-08 18:55:34,498] [INFO] [akka.actor.typed.ActorSystem] [KafkaToSharding-akka.actor.default-dispatcher-5] [] - Message extractor created. Initializing sharding
[info] [2020-07-08 18:55:34,520] [INFO] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [KafkaToSharding-akka.actor.default-dispatcher-14] [ClusterSharding(akka://KafkaToSharding)] - Starting Shard Region [user-processing]...
[info] [2020-07-08 18:55:34,549] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user] - Sharding has started
[info] [2020-07-08 18:55:34,556] [INFO] [akka.cluster.sharding.ShardRegion] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processing] - user-processing: Idle entities will be passivated after [2.000 min]
[info] [2020-07-08 18:55:37,891] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Node [akka://KafkaToSharding@127.0.0.1:2551] is JOINING itself (with roles [dc-default]) and forming new cluster
[info] [2020-07-08 18:55:37,892] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[info] [2020-07-08 18:55:37,898] [INFO] [akka.cluster.Cluster] [KafkaToSharding-akka.actor.default-dispatcher-5] [Cluster(akka://KafkaToSharding)] - Cluster Node [akka://KafkaToSharding@127.0.0.1:2551] - Leader is moving node [akka://KafkaToSharding@127.0.0.1:2551] to [Up]
[info] [2020-07-08 18:55:37,903] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Member has joined the cluster
[info] [2020-07-08 18:55:37,904] [INFO] [sample.sharding.kafka.Main$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/user] - Sharding started and joined cluster. Starting event processor
[info] [2020-07-08 18:55:37,908] [INFO] [akka.cluster.sbr.SplitBrainResolver] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding/system/cluster/core/daemon/downingProvider] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[info] [2020-07-08 18:55:37,917] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - Singleton manager starting singleton actor [akka://KafkaToSharding/system/sharding/user-processingCoordinator/singleton]
[info] [2020-07-08 18:55:37,920] [INFO] [akka.cluster.singleton.ClusterSingletonManager] [KafkaToSharding-akka.actor.default-dispatcher-3] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator] - ClusterSingletonManager state change [Start -> Oldest]
[info] [2020-07-08 18:55:37,936] [INFO] [akka.cluster.sharding.DDataShardCoordinator] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding@127.0.0.1:2551/system/sharding/user-processingCoordinator/singleton/coordinator] - ShardCoordinator was moved to the active state State(Map())
[info] [2020-07-08 18:55:37,942] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-21] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Starting. StageActor Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594]
[info] [2020-07-08 18:55:42,312] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' assigned topic partitions to cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-07-08 18:55:42,443] [ERROR] [akka.dispatch.Dispatcher] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka.dispatch.Dispatcher] - Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].
[info] java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing#-748606125]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].
[info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread(ActorContextImpl.scala:317)
[info] at akka.actor.typed.internal.ActorContextImpl.checkCurrentActorThread$(ActorContextImpl.scala:305)
[info] at akka.actor.typed.internal.adapter.ActorContextAdapter.checkCurrentActorThread(ActorContextAdapter.scala:49)
[info] at akka.actor.typed.internal.ActorContextImpl.log(ActorContextImpl.scala:161)
[info] at akka.actor.typed.internal.ActorContextImpl.log$(ActorContextImpl.scala:160)
[info] at akka.actor.typed.internal.adapter.ActorContextAdapter.log(ActorContextAdapter.scala:49)
[info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4(KafkaClusterSharding.scala:314)
[info] at akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$.$anonfun$apply$4$adapted(KafkaClusterSharding.scala:312)
[info] at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:447)
[info] at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:56)
[info] at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:93)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info] at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
[info] at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:93)
[info] at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
[info] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
[info] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[info] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[info] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[info] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[info] [2020-07-08 18:55:52,896] [WARN] [akka.remote.artery.Association] [KafkaToSharding-akka.actor.default-dispatcher-14] [Association(akka://KafkaToSharding)] - Outbound control stream to [akka://KafkaToSharding@127.0.0.1:2552] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://KafkaToSharding@127.0.0.1:2552] did not complete within 20000 ms
[info] [2020-07-08 18:56:40,565] [INFO] [akka.kafka.internal.SingleSourceLogic] [KafkaToSharding-akka.actor.default-dispatcher-5] [SingleSourceLogic(akka://KafkaToSharding)] - [117ff] Completing
[info] [2020-07-08 18:56:40,566] [INFO] [sample.sharding.kafka.UserEventsKafkaProcessor$] [KafkaToSharding-akka.actor.default-dispatcher-14] [akka://KafkaToSharding/user/kafka-event-processor] - Consumer stopped Failure(java.lang.UnsupportedOperationException: Unsupported access to ActorContext from the outside of Actor[akka://KafkaToSharding/user/kafka-event-processor#1200911144]. No message is currently processed by the actor, but ActorContext was called from Thread[KafkaToSharding-akka.actor.default-dispatcher-14,5,main].)
[info] [2020-07-08 18:56:40,569] [INFO] [akka.kafka.cluster.sharding.KafkaClusterSharding$RebalanceListener$] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-cluster-sharding-rebalance-listener-user-processing] - Consumer group 'user-processing' revoked topic partitions from cluster member 'akka://KafkaToSharding@127.0.0.1:2551': [user-events-16,user-events-29,user-events-44,user-events-101,user-events-112,user-events-94,user-events-114,user-events-83,user-events-0,user-events-13,user-events-75,user-events-8,user-events-72,user-events-37,user-events-42,user-events-108,user-events-34,user-events-64,user-events-58,user-events-61,user-events-113,user-events-87,user-events-119,user-events-77,user-events-71,user-events-95,user-events-50,user-events-24,user-events-124,user-events-53,user-events-47,user-events-20,user-events-106,user-events-122,user-events-27,user-events-104,user-events-7,user-events-115,user-events-3,user-events-15,user-events-4,user-events-23,user-events-19,user-events-88,user-events-84,user-events-11,user-events-68,user-events-38,user-events-105,user-events-118,user-events-33,user-events-49,user-events-67,user-events-100,user-events-62,user-events-111,user-events-121,user-events-127,user-events-41,user-events-54,user-events-57,user-events-78,user-events-10,user-events-14,user-events-2,user-events-6,user-events-117,user-events-123,user-events-107,user-events-98,user-events-126,user-events-81,user-events-21,user-events-5,user-events-32,user-events-97,user-events-96,user-events-74,user-events-66,user-events-103,user-events-40,user-events-110,user-events-26,user-events-85,user-events-79,user-events-69,user-events-48,user-events-93,user-events-63,user-events-92,user-events-56,user-events-55,user-events-45,user-events-18,user-events-82,user-events-120,user-events-31,user-events-35,user-events-39,user-events-99,user-events-90,user-events-109,user-events-125,user-events-76,user-events-80,user-events-43,user-events-12,user-events-1,user-events-25,user-events-116,user-events-17,user-events-28,user-events-70,user-events-36,user-events-89,user-events-73,user-events-51,user-events-60,user-events-102,user-events-9,user-events-65,user-events-91,user-events-30,user-events-59,user-events-86,user-events-46,user-events-52,user-events-22]
[info] [2020-07-08 18:56:40,587] [INFO] [akka.actor.RepointableActorRef] [KafkaToSharding-akka.actor.default-dispatcher-5] [akka://KafkaToSharding/system/kafka-consumer-1] - Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://KafkaToSharding/system/Materializers/StreamSupervisor-0/$$a#1729317594] to Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://KafkaToSharding/system/kafka-consumer-1#1483160912] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
Reproducible Test Case
Please provide a PR with a failing test.
If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.
The text was updated successfully, but these errors were encountered: