diff --git a/eclair-core/pom.xml b/eclair-core/pom.xml index 5ac2492181..24fa89cdad 100644 --- a/eclair-core/pom.xml +++ b/eclair-core/pom.xml @@ -196,7 +196,7 @@ org.zeromq jeromq - 0.5.0 + 0.5.2 diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala index 06854651c3..970bd9d6e7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala @@ -41,6 +41,7 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED) subscriber.connect(address) subscriber.subscribe(topic.getBytes(ZMQ.CHARSET)) + subscriber.setTCPKeepAlive(1) val monitor = ctx.createSocket(SocketType.PAIR) monitor.connect("inproc://events") @@ -49,18 +50,18 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] // we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again @tailrec - final def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match { + final def checkEvent(): Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match { case Some(event) => self ! event - checkEvent + checkEvent() case None => () } @tailrec - final def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match { + final def checkMsg(): Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match { case Some(msg) => self ! msg - checkMsg + checkMsg() case None => () } @@ -69,11 +70,11 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] override def receive: Receive = { case Symbol("checkEvent") => - checkEvent + checkEvent() context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkEvent")) case Symbol("checkMsg") => - checkMsg + checkMsg() context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkMsg")) case event: Event => event.getEvent match { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 48ce5ce7d6..bf89df3fed 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -84,6 +84,22 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind } } + test("reconnect ZMQ automatically") { + withWatcher(f => { + import f._ + + // When the watcher starts, it broadcasts the current height. + val block1 = listener.expectMsgType[CurrentBlockCount] + listener.expectNoMessage(100 millis) + + restartBitcoind(probe) + generateBlocks(1) + val block2 = listener.expectMsgType[CurrentBlockCount] + assert(block2.blockCount === block1.blockCount + 1) + listener.expectNoMessage(100 millis) + }) + } + test("add/remove watches from/to utxo map") { val m0 = Map.empty[OutPoint, Set[Watch[_ <: WatchTriggered]]] val txid = randomBytes32