Skip to content

Commit

Permalink
Merge pull request #660 from wavesplatform/node-312-and-coeval
Browse files Browse the repository at this point in the history
block headers and coeval
  • Loading branch information
Tolsi authored Nov 7, 2017
2 parents cc10fab + b220f0a commit b27f5e7
Show file tree
Hide file tree
Showing 98 changed files with 592 additions and 748 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class NarrowTransactionGenerator(settings: Settings,

val tradeAssetDistribution = {
tradeAssetIssue +: accounts.map(acc => {
TransferTransaction.create(Some(tradeAssetIssue.id), issueTransactionSender, acc, 5, System.currentTimeMillis(), None, 100000, Array.fill(r.nextInt(100))(r.nextInt().toByte)).right.get
TransferTransaction.create(Some(tradeAssetIssue.id()), issueTransactionSender, acc, 5, System.currentTimeMillis(), None, 100000, Array.fill(r.nextInt(100))(r.nextInt().toByte)).right.get
})
}

Expand Down Expand Up @@ -86,7 +86,7 @@ class NarrowTransactionGenerator(settings: Settings,
val asset = randomFrom(validIssueTxs)
asset.map(issue => {
val pk = accounts.find(_ == issue.sender).get
(pk, Some(issue.id))
(pk, Some(issue.id()))
})
} else Some(randomFrom(accounts).get, None)
senderAndAssetOpt.flatMap { case (sender, asset) =>
Expand All @@ -97,17 +97,17 @@ class NarrowTransactionGenerator(settings: Settings,
val reissuable = r.nextBoolean()
randomFrom(reissuableIssueTxs).flatMap(assetTx => {
val sender = accounts.find(_.address == assetTx.sender.address).get
logOption(ReissueTransaction.create(sender, assetTx.id, Random.nextInt(Int.MaxValue), reissuable, moreThatStandartFee, ts))
logOption(ReissueTransaction.create(sender, assetTx.id(), Random.nextInt(Int.MaxValue), reissuable, moreThatStandartFee, ts))
})
case TransactionType.BurnTransaction =>
randomFrom(validIssueTxs).flatMap(assetTx => {
val sender = accounts.find(_.address == assetTx.sender.address).get
logOption(BurnTransaction.create(sender, assetTx.id, Random.nextInt(1000), moreThatStandartFee, ts))
logOption(BurnTransaction.create(sender, assetTx.id(), Random.nextInt(1000), moreThatStandartFee, ts))
})
case TransactionType.ExchangeTransaction =>
val matcher = randomFrom(accounts).get
val seller = randomFrom(accounts).get
val pair = AssetPair(None, Some(tradeAssetIssue.id))
val pair = AssetPair(None, Some(tradeAssetIssue.id()))
val sellOrder = Order.sell(seller, matcher, pair, 100000000, 1, ts, ts + 30.days.toMillis, moreThatStandartFee * 3)
val buyer = randomFrom(accounts).get
val buyOrder = Order.buy(buyer, matcher, pair, 100000000, 1, ts, ts + 1.day.toMillis, moreThatStandartFee * 3)
Expand All @@ -121,7 +121,7 @@ class NarrowTransactionGenerator(settings: Settings,
case TransactionType.LeaseCancelTransaction =>
randomFrom(activeLeaseTransactions).flatMap(lease => {
val sender = accounts.find(_.address == lease.sender.address).get
logOption(LeaseCancelTransaction.create(sender, lease.id, moreThatStandartFee * 3, ts))
logOption(LeaseCancelTransaction.create(sender, lease.id(), moreThatStandartFee * 3, ts))
})
case TransactionType.CreateAliasTransaction =>
val sender = randomFrom(accounts).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Worker(settings: Settings,
private def sendTransactions(startStep: Int, channel: Channel): Result[Unit] = {
def loop(step: Int): Result[Unit] = {
log.info(s"[$node] Iteration $step")
val messages: Seq[RawBytes] = generator.next.map(tx => RawBytes(25.toByte, tx.bytes)).toSeq
val messages: Seq[RawBytes] = generator.next.map(tx => RawBytes(25.toByte, tx.bytes())).toSeq

def trySend: Result[Unit] = EitherT {
sender
Expand Down
4 changes: 2 additions & 2 deletions src/it/scala/com/wavesplatform/it/api/NodeApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ trait NodeApi {
postJson("/assets/issue", IssueRequest(address, name, description, quantity, decimals, reissuable, fee)).as[Transaction]

def placeOrder(order: Order): Future[MatcherResponse] =
matcherPost("/matcher/orderbook", order.json).as[MatcherResponse]
matcherPost("/matcher/orderbook", order.json()).as[MatcherResponse]

def expectIncorrectOrderPlacement(order: Order, expectedStatusCode: Int, expectedStatus: String): Future[Boolean] =
matcherPost("/matcher/orderbook", order.json) transform {
matcherPost("/matcher/orderbook", order.json()) transform {
case Failure(UnexpectedStatusCodeException(_, r)) if r.getStatusCode == expectedStatusCode =>
Try(parse(r.getResponseBody).as[MatcherStatusResponse]) match {
case Success(mr) if mr.status == expectedStatus => Success(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class SimpleTransactionsSuite extends FunSuite with BeforeAndAfterAll with Match
100000L,
System.currentTimeMillis()).right.get
val f = for {
_ <- node.sendByNetwork(RawBytes(TransactionMessageSpec.messageCode, tx.bytes))
_ <- node.sendByNetwork(RawBytes(TransactionMessageSpec.messageCode, tx.bytes()))
_ <- Future.successful(Thread.sleep(2000))

height <- traverse(nodes)(_.height).map(_.max)
_ <- traverse(nodes)(_.waitForHeight(height + 1))
tx <- node.waitForTransaction(tx.id.base58)
tx <- node.waitForTransaction(tx.id().base58)
} yield {
tx shouldBe NodeApi.Transaction(tx.`type`, tx.id, tx.fee, tx.timestamp)
}
Expand All @@ -56,9 +56,9 @@ class SimpleTransactionsSuite extends FunSuite with BeforeAndAfterAll with Match
100000L,
System.currentTimeMillis() + (1 days).toMillis).right.get
val f = for {
_ <- node.sendByNetwork(RawBytes(TransactionMessageSpec.messageCode, tx.bytes))
_ <- node.sendByNetwork(RawBytes(TransactionMessageSpec.messageCode, tx.bytes()))
_ <- Future.successful(Thread.sleep(2000))
_ <- Future.sequence(nodes.map(_.ensureTxDoesntExist(tx.id.base58)))
_ <- Future.sequence(nodes.map(_.ensureTxDoesntExist(tx.id().base58)))
} yield ()
Await.result(f, 60.seconds)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TransferTransactionSpecification(override val allNodes: Seq[Node], overrid
Array.emptyByteArray
).right.get

val invalidTxId = invalidByTsTx.id
val invalidTxId = invalidByTsTx.id()

val invalidByTsSignedRequest = createSignedTransferRequest(invalidByTsTx)

Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/com/wavesplatform/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.wavesplatform.settings.{BlockchainSettings, FunctionalitySettings, Wa
import com.wavesplatform.state2._
import com.wavesplatform.state2.reader.SnapshotStateReader
import kamon.Kamon
import monix.eval.Coeval
import org.influxdb.dto.Point
import scorex.block.{Block, MicroBlock}
import scorex.consensus.TransactionsOrdering
Expand All @@ -34,7 +35,7 @@ object Coordinator extends ScorexLogging with Instrumented {
def isForkValidWithCheckpoint(lastCommonHeight: Int): Boolean =
extension.zipWithIndex.forall(p => checkpoint.isBlockValid(p._1.signerData.signature, lastCommonHeight + 1 + p._2))

lazy val forkApplicationResultEi: Either[ValidationError, BigInt] = {
val forkApplicationResultEi = Coeval.evalOnce {
val firstDeclined = extension.view.map { b =>
b -> appendBlock(checkpoint, history, blockchainUpdater, stateReader(), utxStorage, time, settings.blockchainSettings, featureProvider)(b).right.map {
_.foreach(bh => BlockStats.applied(b, BlockStats.Source.Ext, bh))
Expand Down Expand Up @@ -74,7 +75,7 @@ object Coordinator extends ScorexLogging with Instrumented {
(for {
commonHeightAndDroppedBlocks <- droppedBlocksEi
(commonBlockHeight, droppedBlocks) = commonHeightAndDroppedBlocks
score <- forkApplicationResultEi.left.map((_, droppedBlocks))
score <- forkApplicationResultEi().left.map((_, droppedBlocks))
} yield (commonBlockHeight, droppedBlocks, score))
.right.map { case ((commonBlockHeight, droppedBlocks, score)) =>
val depth = initalHeight - commonBlockHeight
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ class UtxPool(time: Time,
.asScala
.filter(isExpired)
.foreach { tx =>
transactions.remove(tx.id)
pessimisticPortfolios.remove(tx.id)
transactions.remove(tx.id())
pessimisticPortfolios.remove(tx.id())
utxPoolSizeStats.decrement()
}
}

def putIfNew(tx: Transaction): Either[ValidationError, Boolean] = {
putRequestStats.increment()
measureSuccessful(processingTimeStats, {
Option(knownTransactions.getIfPresent(tx.id)) match {
Option(knownTransactions.getIfPresent(tx.id())) match {
case Some(Right(_)) => Right(false)
case Some(Left(er)) => Left(er)
case None =>
Expand All @@ -71,18 +71,18 @@ class UtxPool(time: Time,
diff <- blocking(TransactionDiffer(fs, blocking(history.lastBlockTimestamp()), time.correctedTime(), s.height)(s, tx))
} yield {
utxPoolSizeStats.increment()
pessimisticPortfolios.add(tx.id, diff)
transactions.put(tx.id, tx)
pessimisticPortfolios.add(tx.id(), diff)
transactions.put(tx.id(), tx)
tx
}
knownTransactions.put(tx.id, res)
knownTransactions.put(tx.id(), res)
res.right.map(_ => true)
}
})
}

def removeAll(txs: Traversable[Transaction]): Unit = {
txs.view.map(_.id).foreach { id =>
txs.view.map(_.id()).foreach { id =>
knownTransactions.invalidate(id)
Option(transactions.remove(id)).foreach(_ => utxPoolSizeStats.decrement())
pessimisticPortfolios.remove(id)
Expand Down Expand Up @@ -122,7 +122,7 @@ class UtxPool(time: Time,
case Right(_) =>
(invalid, valid, diff)
case Left(_) =>
(tx.id +: invalid, valid, diff)
(tx.id() +: invalid, valid, diff)
}
case (r, _) => r
}
Expand Down
10 changes: 6 additions & 4 deletions src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.wavesplatform.settings.{FeaturesSettings, FunctionalitySettings}
import com.wavesplatform.state2.{BlockDiff, ByteStr, DataTypes, VariablesStorage, VersionableStorage}
import com.wavesplatform.utils._
import kamon.Kamon
import scorex.block.Block
import scorex.block.{Block, BlockHeader}
import scorex.transaction.History.BlockchainScore
import scorex.transaction.ValidationError.GenericError
import scorex.transaction._
Expand Down Expand Up @@ -64,16 +64,16 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re

if ((height() == 0) || (this.lastBlock.get.uniqueId == block.reference)) consensusValidation.map { blockDiff =>
val h = height() + 1
val score = (if (height() == 0) BigInt(0) else this.score()) + block.blockScore
blockBodyByHeight.mutate(_.put(h, block.bytes))
val score = (if (height() == 0) BigInt(0) else this.score()) + block.blockScore()
blockBodyByHeight.mutate(_.put(h, block.bytes()))
scoreByHeight.mutate(_.put(h, score))
blockIdByHeight.mutate(_.put(h, block.uniqueId))
heightByBlockId.mutate(_.put(block.uniqueId, h))
featuresState.mutate(_.putAll(acceptedFeatures.diff(featuresState().keySet.asScala).map(_ -> h).toMap.asJava))
alterVotes(h, block.featureVotes, 1)
db.commit()
blockHeightStats.record(h)
blockSizeStats.record(block.bytes.length)
blockSizeStats.record(block.bytes().length)
transactionsInBlockStats.record(block.transactionData.size)

if (h % 100 == 0) db.compact(CompactFillRate, CompactMemorySize)
Expand Down Expand Up @@ -134,6 +134,8 @@ class HistoryWriterImpl private(file: Option[File], val synchronizationToken: Re

override def blockAt(height: Int): Option[Block] = blockBytes(height).map(Block.parseBytes(_).get)

override def blockHeaderAndSizeAt(height: Int): Option[(BlockHeader, Int)] =
blockBytes(height).map(bytes => (BlockHeader.parseBytes(bytes).get._1, bytes.length))
}

object HistoryWriterImpl extends ScorexLogging {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/wavesplatform/http/ApiMarshallers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait ApiMarshallers {
implicit val aem: TRM[ApiError] = fromStatusCodeAndValue[StatusCode, JsValue].compose { ae => ae.code -> ae.json }
implicit val vem: TRM[ValidationError] = aem.compose(ve => ApiError.fromValidationError(ve))

implicit val tw: Writes[Transaction] = Writes(_.json)
implicit val tw: Writes[Transaction] = Writes(_.json())

private val jsonStringUnmarshaller =
Unmarshaller.byteStringUnmarshaller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class MatcherTransactionWriter(val settings: MatcherSettings)
}

private def saveExchangeTx(tx: ExchangeTransaction) = {
val txId = tx.id.toString
transactions.put(txId, tx.bytes)
saveOrder2TxId(tx.buyOrder.idStr, txId)
saveOrder2TxId(tx.sellOrder.idStr, txId)
val txId = tx.id().toString
transactions.put(txId, tx.bytes())
saveOrder2TxId(tx.buyOrder.idStr(), txId)
saveOrder2TxId(tx.sellOrder.idStr(), txId)
}
}

Expand All @@ -60,7 +60,7 @@ object MatcherTransactionWriter {

case class GetTransactionsByOrder(orderId: String)
case class GetTransactionsResponse(txs: Seq[ExchangeTransaction]) extends MatcherResponse {
val json = JsArray(txs.map(_.json))
val json = JsArray(txs.map(_.json()))
val code = StatusCodes.OK
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class OrderBookActor(assetPair: AssetPair,
orderBook.asks.values.++(orderBook.bids.values).flatten.filterNot(x => {
val validation = x.order.isValid(ts)
validation
}).map(_.order.idStr).foreach(x => handleValidateCancelResult(Right(x)))
}).map(_.order.idStr()).foreach(x => handleValidateCancelResult(Right(x)))
}

private def handleValidateCancelResult(res: Either[GenericError, String]): Unit = {
Expand Down Expand Up @@ -150,7 +150,7 @@ class OrderBookActor(assetPair: AssetPair,
log.debug(s"Order rejected: $err.err")
apiSender.foreach(_ ! OrderRejected(err.err))
case Right(o) =>
log.debug(s"Order accepted: ${o.idStr}, trying to match ...")
log.debug(s"Order accepted: ${o.idStr()}, trying to match ...")
apiSender.foreach(_ ! OrderAccepted(o))
matchOrder(LimitOrder(o))
}
Expand Down Expand Up @@ -222,14 +222,14 @@ class OrderBookActor(assetPair: AssetPair,
_ <- utx.putIfNew(tx)
} yield tx) match {
case Right(tx) if tx.isInstanceOf[ExchangeTransaction] =>
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, tx.bytes))
allChannels.broadcast(RawBytes(TransactionMessageSpec.messageCode, tx.bytes()))
processEvent(event)
context.system.eventStream.publish(ExchangeTransactionCreated(tx.asInstanceOf[ExchangeTransaction]))
if (event.submittedRemaining > 0)
Some(o.partial(event.submittedRemaining))
else None
case Left(ex) =>
log.info("Can't create tx for o1: " + Json.prettyPrint(o.order.json) + "\n, o2: " + Json.prettyPrint(c.order.json))
log.info("Can't create tx for o1: " + Json.prettyPrint(o.order.json()) + "\n, o2: " + Json.prettyPrint(c.order.json()))
processInvalidTransaction(event, ex)
}
case _ => None
Expand Down Expand Up @@ -294,7 +294,7 @@ object OrderBookActor {
case object OrderCleanup

case class OrderAccepted(order: Order) extends MatcherResponse {
val json = Json.obj("status" -> "OrderAccepted", "message" -> order.json)
val json = Json.obj("status" -> "OrderAccepted", "message" -> order.json())
val code = StatusCodes.OK
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ object Events {
def createOrderInfo(event: Event): Map[String, OrderInfo] = {
event match {
case OrderAdded(lo) =>
Map(lo.order.idStr->
Map(lo.order.idStr()->
OrderInfo(lo.order.amount, 0L, false))
case oe: OrderExecuted =>
val (o1, o2) = (oe.submittedExecuted, oe.counterExecuted)
Map(o1.order.idStr -> OrderInfo(o1.order.amount, o1.amount, false),
o2.order.idStr -> OrderInfo(o2.order.amount, o2.amount, false)
Map(o1.order.idStr() -> OrderInfo(o1.order.amount, o1.amount, false),
o2.order.idStr() -> OrderInfo(o2.order.amount, o2.amount, false)
)
case OrderCanceled(lo) =>
Map(lo.order.idStr->
Map(lo.order.idStr()->
OrderInfo(lo.order.amount, 0, true))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ object OrderBook {
}

def cancelOrder(ob: OrderBook, orderId: String): Option[OrderCanceled] = {
ob.bids.find { case (_, v) => v.exists(_.order.idStr == orderId)}
.orElse(ob.asks.find { case (_, v) => v.exists(_.order.idStr == orderId)})
ob.bids.find { case (_, v) => v.exists(_.order.idStr() == orderId)}
.orElse(ob.asks.find { case (_, v) => v.exists(_.order.idStr() == orderId)})
.fold(Option.empty[OrderCanceled]) {
case (_, v) =>
Some(OrderCanceled(v.find(_.order.idStr == orderId).get))
Some(OrderCanceled(v.find(_.order.idStr() == orderId).get))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ case class OrderHistoryImpl(p: OrderHistoryStorage) extends OrderHistory with Sc
}

def saveOrder(order: Order): Unit = {
if (!p.orders.containsKey(order.idStr)) {
p.orders.putIfAbsent(order.idStr, order.jsonStr)
if (!p.orders.containsKey(order.idStr())) {
p.orders.putIfAbsent(order.idStr(), order.jsonStr)
}
}

Expand All @@ -78,12 +78,12 @@ case class OrderHistoryImpl(p: OrderHistoryStorage) extends OrderHistory with Sc
saveOrder(lo.order)
saveOrdeInfo(event)
saveOpenPortfolio(event)
savePairAddress(lo.order.assetPair, lo.order.senderPublicKey.address, lo.order.idStr)
savePairAddress(lo.order.assetPair, lo.order.senderPublicKey.address, lo.order.idStr())
}

override def orderExecuted(event: OrderExecuted): Unit = {
saveOrder(event.submitted.order)
savePairAddress(event.submitted.order.assetPair, event.submitted.order.senderPublicKey.address, event.submitted.order.idStr)
savePairAddress(event.submitted.order.assetPair, event.submitted.order.senderPublicKey.address, event.submitted.order.idStr())
saveOrdeInfo(event)
saveOpenPortfolio(OrderAdded(event.submittedExecuted))
saveOpenPortfolio(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ trait OrderValidator {
order.signaturesValid().isRight :| "signature should be valid" &&
order.isValid(NTP.correctedTime()) &&
(order.matcherFee >= settings.minOrderFee) :| s"Order matcherFee should be >= ${settings.minOrderFee}" &&
(orderHistory.orderStatus(order.idStr) == LimitOrder.NotFound) :| "Order is already accepted" &&
(orderHistory.orderStatus(order.idStr()) == LimitOrder.NotFound) :| "Order is already accepted" &&
isBalanceWithOpenOrdersEnough(order)
if (!v) {
Left(GenericError(v.messages()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object BlockMessageSpec extends MessageSpec[Block] {

override def maxLength = 271 + TransactionMessageSpec.maxLength * Block.MaxTransactionsPerBlockVer3

override def serializeData(block: Block): Array[Byte] = block.bytes
override def serializeData(block: Block): Array[Byte] = block.bytes()

override def deserializeData(bytes: Array[Byte]): Try[Block] = Block.parseBytes(bytes)
}
Expand Down Expand Up @@ -212,7 +212,7 @@ object TransactionMessageSpec extends MessageSpec[Transaction] {
override def deserializeData(bytes: Array[Byte]): Try[Transaction] =
TransactionParser.parseBytes(bytes)

override def serializeData(tx: Transaction): Array[Byte] = tx.bytes
override def serializeData(tx: Transaction): Array[Byte] = tx.bytes()
}

object MicroBlockInvMessageSpec extends MessageSpec[MicroBlockInv] {
Expand Down Expand Up @@ -255,7 +255,7 @@ object MicroBlockResponseMessageSpec extends MessageSpec[MicroBlockResponse] {
override def deserializeData(bytes: Array[Byte]): Try[MicroBlockResponse] =
MicroBlock.parseBytes(bytes).map(MicroBlockResponse)

override def serializeData(resp: MicroBlockResponse): Array[Byte] = resp.microblock.bytes
override def serializeData(resp: MicroBlockResponse): Array[Byte] = resp.microblock.bytes()

override def maxLength = 271 + TransactionMessageSpec.maxLength * MaxTransactionsPerMicroblock

Expand Down
Loading

0 comments on commit b27f5e7

Please sign in to comment.