From 1fbede761854c15a71834c150db3a106d6199b57 Mon Sep 17 00:00:00 2001 From: Bastien Teinturier <31281497+t-bast@users.noreply.github.com> Date: Mon, 17 May 2021 15:32:25 +0200 Subject: [PATCH] Add TCP keep-alive on ZMQ socket (#1807) One of ZMQ's drawbacks is that subscribers on an unreliable network may silently disconnect from publishers in case of network failures. In our case, we want to reconnect immediately when that happens, so we set a tcp keep-alive to ensure this. Fixes #1789 --- eclair-core/pom.xml | 2 +- .../blockchain/bitcoind/zmq/ZMQActor.scala | 13 +++++++------ .../blockchain/bitcoind/ZmqWatcherSpec.scala | 16 ++++++++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/eclair-core/pom.xml b/eclair-core/pom.xml index 5ac2492181..24fa89cdad 100644 --- a/eclair-core/pom.xml +++ b/eclair-core/pom.xml @@ -196,7 +196,7 @@ org.zeromq jeromq - 0.5.0 + 0.5.2 diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala index 06854651c3..970bd9d6e7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/zmq/ZMQActor.scala @@ -41,6 +41,7 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED) subscriber.connect(address) subscriber.subscribe(topic.getBytes(ZMQ.CHARSET)) + subscriber.setTCPKeepAlive(1) val monitor = ctx.createSocket(SocketType.PAIR) monitor.connect("inproc://events") @@ -49,18 +50,18 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] // we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again @tailrec - final def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match { + final def checkEvent(): Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match { case Some(event) => self ! event - checkEvent + checkEvent() case None => () } @tailrec - final def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match { + final def checkMsg(): Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match { case Some(msg) => self ! msg - checkMsg + checkMsg() case None => () } @@ -69,11 +70,11 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]] override def receive: Receive = { case Symbol("checkEvent") => - checkEvent + checkEvent() context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkEvent")) case Symbol("checkMsg") => - checkMsg + checkMsg() context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkMsg")) case event: Event => event.getEvent match { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 48ce5ce7d6..bf89df3fed 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -84,6 +84,22 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind } } + test("reconnect ZMQ automatically") { + withWatcher(f => { + import f._ + + // When the watcher starts, it broadcasts the current height. + val block1 = listener.expectMsgType[CurrentBlockCount] + listener.expectNoMessage(100 millis) + + restartBitcoind(probe) + generateBlocks(1) + val block2 = listener.expectMsgType[CurrentBlockCount] + assert(block2.blockCount === block1.blockCount + 1) + listener.expectNoMessage(100 millis) + }) + } + test("add/remove watches from/to utxo map") { val m0 = Map.empty[OutPoint, Set[Watch[_ <: WatchTriggered]]] val txid = randomBytes32