Skip to content

Commit

Permalink
Merge pull request #446 from wavesplatform/node-93-rebroadcast-tx
Browse files Browse the repository at this point in the history
NODE-93: do not rebroadcast existing tx
  • Loading branch information
ismagin authored Aug 10, 2017
2 parents f73b647 + 4108449 commit afb08b7
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ object Coordinator extends ScorexLogging {
_ <- Either.cond(isForkValidWithCheckpoint(commonBlockHeight), (), GenericError("Fork contains block that doesn't match checkpoint, declining fork"))
droppedTransactions <- blockchainUpdater.removeAfter(lastCommonBlockId)
score <- forkApplicationResultEi
_ = droppedTransactions.foreach(t => utxStorage.putIfNew(t))
} yield {
droppedTransactions.foreach(utxStorage.putIfNew)
miner.lastBlockChanged()
updateBlockchainReadinessFlag(history, time, blockchainReadiness, settings.minerSettings.intervalAfterLastBlockThenGenerationIsAllowed)
score
Expand Down
31 changes: 18 additions & 13 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,24 @@ class UtxPool(time: Time,
}
}

def putIfNew(tx: Transaction): Either[ValidationError, Transaction] = write { implicit l =>
if (transactions().size >= utxSettings.maxSize) {
Left(GenericError("Transaction pool size limit is reached"))
} else knownTransactions().get(tx.id, () => {
val validationResult = for {
_ <- feeCalculator.enoughFee(tx)
diff <- TransactionDiffer(fs, history.lastBlock.map(_.timestamp), time.correctedTime(), stateReader.height)(stateReader, tx)
_ = pessimisticPortfolios.mutate(_.add(tx.id, diff))
_ = transactions.transform(_.updated(tx.id, tx))
} yield tx

validationResult
})
def putIfNew(tx: Transaction): Either[ValidationError, Boolean] = write { implicit l =>
knownTransactions.mutate(cache =>
Option(cache.getIfPresent(tx.id)) match {
case Some(Right(_)) => Right(false)
case Some(Left(er)) => Left(er)
case None =>
val res = for {
_ <- Either.cond(transactions().size < utxSettings.maxSize, (), GenericError("Transaction pool size limit is reached"))
_ <- feeCalculator.enoughFee(tx)
diff <- TransactionDiffer(fs, history.lastBlock.map(_.timestamp), time.correctedTime(), stateReader.height)(stateReader, tx)
} yield {
pessimisticPortfolios.mutate(_.add(tx.id, diff))
transactions.transform(_.updated(tx.id, tx))
tx
}
cache.put(tx.id, res)
res.right.map(_ => true)
})
}

def removeAll(tx: Traversable[Transaction]): Unit = write { implicit l =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class OrderBookActor(assetPair: AssetPair,

def fullCommands: Receive = readOnlyCommands orElse snapshotsCommands orElse executeCommands

def executeCommands: Receive = {
def executeCommands: Receive = {
case order: Order =>
onAddOrder(order)
case cancel: CancelOrder =>
Expand Down Expand Up @@ -233,7 +233,10 @@ class OrderBookActor(assetPair: AssetPair,
None

case event@OrderExecuted(o, c) =>
createTransaction(o, c).flatMap(utx.putIfNew(_)) match {
(for {
tx <- createTransaction(o, c)
_ <- utx.putIfNew(tx)
} yield tx) match {
case Right(tx) if tx.isInstanceOf[ExchangeTransaction] =>
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, tx.bytes))
processEvent(event)
Expand Down Expand Up @@ -324,5 +327,6 @@ object OrderBookActor {
case class Snapshot(orderBook: OrderBook)

case object ValidationTimeoutExceeded

}

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ class UtxPoolSynchronizer(utx: UtxPool, allChannels: ChannelGroup)
log.debug(s"${id(ctx)} Error processing transaction ${t.id}: $e")
case Left(e) =>
log.debug(s"${id(ctx)} Error processing transaction ${t.id}: $e")
case Right(_) =>
case Right(true) =>
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, t.bytes), Some(ctx.channel()))
log.trace(s"${id(ctx)} Added transaction ${t.id} to UTX pool")
case Right(false) =>
log.trace(s"${id(ctx)} TX ${t.id} already known")
})
case _ => super.channelRead(ctx, msg)
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/com/wavesplatform/network/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.netty.channel.{Channel, ChannelHandlerContext}
import io.netty.util.NetUtil.toSocketAddressString
import io.netty.util.concurrent.{EventExecutorGroup, ScheduledFuture}
import scorex.block.Block
import scorex.transaction.Transaction
import scorex.utils.ScorexLogging

import scala.concurrent.duration._
Expand Down Expand Up @@ -55,5 +56,8 @@ package object network extends ScorexLogging {
log.trace(s"Broadcasting $message to ${allChannels.size()} channels${except.fold("")(c => s" (except ${id(c)})")}")
allChannels.writeAndFlush(message, except.fold(ChannelMatchers.all())(ChannelMatchers.isNot))
}

def broadcastTx(tx:Transaction, except: Option[Channel] = None): Unit =
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, tx.bytes), except)
}
}
25 changes: 16 additions & 9 deletions src/main/scala/scorex/BroadcastRoute.scala
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package scorex

import com.wavesplatform.UtxPool
import com.wavesplatform.network.{RawBytes, TransactionMessageSpec}
import com.wavesplatform.network._
import io.netty.channel.group.ChannelGroup
import scorex.api.http.ApiError
import scorex.transaction.{Transaction, ValidationError}
import com.wavesplatform.network._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

trait BroadcastRoute {
def utx: UtxPool

def allChannels: ChannelGroup

protected def doBroadcast(v: Either[ValidationError, Transaction]): Future[Either[ApiError, Transaction]] =
Future(v.flatMap(t => utx.putIfNew(t)
.map(t => {
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, t.bytes))
t
})).left.map(ApiError.fromValidationError))
import scala.concurrent.ExecutionContext.Implicits.global

protected def doBroadcast(v: Either[ValidationError, Transaction]): Future[Either[ApiError, Transaction]] = Future {
(for {
tx <- v
utxResult <- utx.putIfNew(tx)
} yield {
if (utxResult) {
allChannels.broadcastTx(tx, None)
}
tx
}).left.map(ApiError.fromValidationError)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class PaymentRouteSpec extends RouteSpec("/payment")
with MockFactory with PropertyChecks with RestAPISettingsHelper with TestWallet with TransactionGen {

private val utx = stub[UtxPool]
(utx.putIfNew _).when(*).onCall((t: Transaction) => Right(t)).anyNumberOfTimes()
(utx.putIfNew _).when(*).onCall((t: Transaction) => Right(true)).anyNumberOfTimes()
private val allChannels = stub[ChannelGroup]
private implicit def noShrink[A]: Shrink[A] = Shrink(_ => Stream.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest"))
val functionalitySettings = stub[FunctionalitySettings]

val utx = stub[UtxPool]
(utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(tx))
(utx.putIfNew _).when(*).onCall((tx: Transaction) => Right(true))
val allChannels = stub[ChannelGroup]
actor = system.actorOf(Props(new OrderBookActor(pair, orderHistoryRef, storedState,
wallet, utx, allChannels, settings, history, functionalitySettings) with RestartableActor))
Expand Down Expand Up @@ -275,7 +275,7 @@ class OrderBookActorSpecification extends TestKit(ActorSystem("MatcherTest"))
(pool.putIfNew _).when(*).onCall { (tx: Transaction) =>
tx match {
case om: ExchangeTransaction if om.buyOrder == ord2 => Left(ValidationError.GenericError("test"))
case _: Transaction => Right(tx)
case _: Transaction => Right(true)
}
}
val allChannels = stub[ChannelGroup]
Expand Down

0 comments on commit afb08b7

Please sign in to comment.