Skip to content

Commit

Permalink
Merge branch 'master' into node-339-level-db
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeykiselev committed Jan 15, 2018
2 parents a9f3cde + dccf0db commit 2118790
Show file tree
Hide file tree
Showing 20 changed files with 171 additions and 129 deletions.
35 changes: 17 additions & 18 deletions src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.wavesplatform.metrics.Metrics
import com.wavesplatform.mining.{Miner, MinerImpl}
import com.wavesplatform.network._
import com.wavesplatform.settings._
import com.wavesplatform.state2.StateWriter
import com.wavesplatform.state2.appender.{BlockAppender, CheckpointAppender, ExtensionAppender, MicroblockAppender}
import com.wavesplatform.utils.{SystemInformationReporter, forceStopApplication}
import io.netty.channel.Channel
Expand Down Expand Up @@ -58,20 +57,19 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
// Start /node API right away
private implicit val as: ActorSystem = actorSystem
private implicit val materializer: ActorMaterializer = ActorMaterializer()

private val nodeApiLauncher = (bcu: BlockchainUpdater, state: StateWriter) =>
Option(settings.restAPISettings.enable).collect { case true =>
val tags = Seq(typeOf[NodeApiRoute])
val routes = Seq(NodeApiRoute(settings.restAPISettings, bcu, state, () => this.shutdown()))
val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute
val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port)
serverBinding = Await.result(httpFuture, 10.seconds)
log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}")
(tags, routes)
}
private val (storage, heights) = StorageFactory(db, settings).get
private val nodeApi = Option(settings.restAPISettings.enable).collect { case true =>
val tags = Seq(typeOf[NodeApiRoute])
val routes = Seq(NodeApiRoute(settings.restAPISettings, heights, () => this.shutdown()))
val combinedRoute: Route = CompositeHttpService(actorSystem, tags, routes, settings.restAPISettings).compositeRoute
val httpFuture = Http().bindAndHandle(combinedRoute, settings.restAPISettings.bindAddress, settings.restAPISettings.port)
serverBinding = Await.result(httpFuture, 10.seconds)
log.info(s"Node REST API was bound on ${settings.restAPISettings.bindAddress}:${settings.restAPISettings.port}")
(tags, routes)
}

private val checkpointService = new CheckpointServiceImpl(db, settings.checkpointsSettings)
private val (history, featureProvider, stateReader, blockchainUpdater, blockchainDebugInfo, nodeApi) = StorageFactory(db, settings, nodeApiLauncher).get
private val (history, featureProvider, stateWriter, stateReader, blockchainUpdater, blockchainDebugInfo) = storage()
private lazy val upnp = new UPnP(settings.networkSettings.uPnPSettings) // don't initialize unless enabled
private val wallet: Wallet = Wallet(settings.walletSettings)
private val peerDatabase = new PeerDatabaseImpl(settings.networkSettings)
Expand Down Expand Up @@ -213,17 +211,18 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
upnp.deletePort(addr.getPort)
}

peerDatabase.close()

Try(Await.result(actorSystem.terminate(), stopActorsTimeout))
.failed.map(e => log.error("Failed to terminate actor system", e))

log.debug("Closing storage")
db.close()

log.debug("Closing wallet")
wallet.close()

log.debug("Closing peer database")
peerDatabase.close()

Try(Await.result(actorSystem.terminate(), stopActorsTimeout))
.failed.map(e => log.error("Failed to terminate actor system", e))

log.info("Shutdown complete")
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/wavesplatform/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ class UtxPoolImpl(time: Time,
.values.asScala.toSeq
.sorted(TransactionsOrdering.InUTXPool)
.foldLeft((Seq.empty[ByteStr], Seq.empty[Transaction], Monoid[Diff].empty)) {
case ((invalid, valid, diff), tx) if valid.size <= max =>
case ((invalid, valid, diff), tx) if valid.lengthCompare(max) <= 0 =>
differ(composite(diff.asBlockDiff, s), tx) match {
case Right(newDiff) if valid.size < max =>
case Right(newDiff) if valid.lengthCompare(max) < 0 =>
(invalid, tx +: valid, Monoid.combine(diff, newDiff))
case Right(_) =>
(invalid, valid, diff)
Expand Down
12 changes: 8 additions & 4 deletions src/main/scala/com/wavesplatform/history/HistoryWriterImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import scorex.block.{Block, BlockHeader}
import scorex.transaction.History.BlockchainScore
import scorex.transaction.ValidationError.GenericError
import scorex.transaction._
import scorex.utils.ScorexLogging
import scorex.utils.{NTP, ScorexLogging, Time}

import scala.util.Try

class HistoryWriterImpl private(db: DB, val synchronizationToken: ReentrantReadWriteLock,
functionalitySettings: FunctionalitySettings, featuresSettings: FeaturesSettings)
functionalitySettings: FunctionalitySettings, featuresSettings: FeaturesSettings, time: Time)
extends SubStorage(db, "history") with PropertiesStorage with VersionedStorage with History with FeatureProvider with ScorexLogging {

override protected val Version: Int = 1
Expand Down Expand Up @@ -187,13 +187,17 @@ class HistoryWriterImpl private(db: DB, val synchronizationToken: ReentrantReadW
override def blockHeaderAndSizeAt(height: Int): Option[(BlockHeader, Int)] =
blockBytes(height).map(bytes => (BlockHeader.parseBytes(bytes).get._1, bytes.length))

private val heightInfo: (Int, Long) = (height(), time.getTimestamp())

override def debugInfo: HeightInfo = heightInfo

private def getBlockSignature(height: Int): Option[ByteStr] = get(makeKey(SignatureAtHeightPrefix, height)).map(ByteStr.apply)
}

object HistoryWriterImpl extends ScorexLogging {
def apply(db: DB, synchronizationToken: ReentrantReadWriteLock, functionalitySettings: FunctionalitySettings,
featuresSettings: FeaturesSettings): Try[HistoryWriterImpl] =
createWithVerification[HistoryWriterImpl](new HistoryWriterImpl(db, synchronizationToken, functionalitySettings, featuresSettings), h => h.isConsistent)
featuresSettings: FeaturesSettings, time: Time = NTP): Try[HistoryWriterImpl] =
createWithVerification[HistoryWriterImpl](new HistoryWriterImpl(db, synchronizationToken, functionalitySettings, featuresSettings, time), h => h.isConsistent)

private val blockHeightStats = Kamon.metrics.histogram("block-height")
private val blockSizeStats = Kamon.metrics.histogram("block-size-bytes")
Expand Down
35 changes: 20 additions & 15 deletions src/main/scala/com/wavesplatform/history/StorageFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@ import java.util.concurrent.locks.{ReentrantReadWriteLock => RWL}
import com.wavesplatform.features.FeatureProvider
import com.wavesplatform.settings.WavesSettings
import com.wavesplatform.state2._
import com.wavesplatform.utils.HeightInfo
import monix.eval.Coeval
import org.iq80.leveldb.DB
import scorex.transaction._
import scorex.utils.NTP
import scorex.utils.{NTP, Time}

import scala.util.{Success, Try}

object StorageFactory {

private def createStateStorage(history: History with FeatureProvider, db: DB): Try[StateStorage] =
type Storage = Coeval[(NgHistory with DebugNgHistory, FeatureProvider, StateWriter, StateReader, BlockchainUpdater, BlockchainDebugInfo)]
type HeightInfos = Coeval[(HeightInfo, HeightInfo)]

private def createStateStorage(history: History with FeatureProvider, db: DB, time: Time): Try[StateStorage] =
StateStorage(db, dropExisting = false).flatMap { ss =>
if (ss.getHeight <= history.height()) Success(ss) else {
StateStorage(db, dropExisting = true)
}
}

def apply[T](db: DB, settings: WavesSettings,
beforeStateUpdate: (BlockchainUpdater, StateWriter) => T = (_: BlockchainUpdater, _: StateWriter) => (),
time: scorex.utils.Time = NTP):
Try[(NgHistory with DebugNgHistory, FeatureProvider, StateReader, BlockchainUpdater, BlockchainDebugInfo, T)] = {
def apply[T](db: DB, settings: WavesSettings, time: Time = NTP): Try[(Storage, HeightInfos)] = {
val lock = new RWL(true)
for {
historyWriter <- HistoryWriterImpl(db, lock, settings.blockchainSettings.functionalitySettings, settings.featuresSettings)
ss <- createStateStorage(historyWriter, db)
stateWriter = new StateWriterImpl(ss, lock)
bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock)
callbackResult = beforeStateUpdate(bcu, stateWriter)
} yield {
bcu.syncPersistedAndInMemory()
val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader
(history, history, bcu.bestLiquidState, bcu, bcu, callbackResult)
}
ss <- createStateStorage(historyWriter, db, time)
} yield (
Coeval {
val stateWriter = new StateWriterImpl(ss, lock)
val bcu = BlockchainUpdaterImpl(stateWriter, historyWriter, settings, time, lock)
val history: NgHistory with DebugNgHistory with FeatureProvider = bcu.historyReader
(history, history, stateWriter, bcu.bestLiquidState, bcu, bcu)
},
Coeval {
(historyWriter.debugInfo, ss.debugInfo)
}
)
}
}
13 changes: 3 additions & 10 deletions src/main/scala/com/wavesplatform/http/NodeApiRoute.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@ import javax.ws.rs.Path

import akka.http.scaladsl.server.Route
import com.wavesplatform.Shutdownable
import com.wavesplatform.network.lastObserved
import com.wavesplatform.settings.{Constants, RestAPISettings}
import com.wavesplatform.state2.StateWriter
import com.wavesplatform.utils.HeightInfo
import io.swagger.annotations._
import monix.eval.Coeval
import monix.execution.Scheduler.Implicits.global
import play.api.libs.json.Json
import scorex.api.http.{ApiRoute, CommonApiFunctions}
import scorex.transaction.{BlockchainUpdater, LastBlockInfo}
import scorex.utils.ScorexLogging

@Path("/node")
@Api(value = "node")
case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: BlockchainUpdater, state: StateWriter, application: Shutdownable)
case class NodeApiRoute(settings: RestAPISettings, heights: Coeval[(HeightInfo, HeightInfo)], application: Shutdownable)
extends ApiRoute with CommonApiFunctions with ScorexLogging {

private val lastHeight: Coeval[Option[LastBlockInfo]] = lastObserved(blockchainUpdater.lastBlockInfo)
private val lastState: Coeval[Option[StateWriter.Status]] = lastObserved(state.status)

override lazy val route = pathPrefix("node") {
stop ~ status ~ version
}
Expand All @@ -48,8 +42,7 @@ case class NodeApiRoute(settings: RestAPISettings, blockchainUpdater: Blockchain
@Path("/status")
@ApiOperation(value = "Status", notes = "Get status of the running core", httpMethod = "GET")
def status: Route = (get & path("status")) {
val (bcHeight, bcTime) = lastHeight().map { case LastBlockInfo(_, h, _, _, t) => (h, t) }.getOrElse((0, 0L))
val (stHeight, stTime) = lastState().map { case StateWriter.Status(h, t) => (h, t) }.getOrElse((0, 0L))
val ((bcHeight, bcTime), (stHeight, stTime)) = heights()
val lastUpdated = bcTime max stTime
complete(Json.obj(
"blockchainHeight" -> bcHeight,
Expand Down
69 changes: 41 additions & 28 deletions src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,58 +124,64 @@ class MinerImpl(
}.delayExecution(delay)


private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[Either[ValidationError, Option[Block]]] = Task {
log.trace(s"Generating microblock for $account")
private def generateOneMicroBlockTask(account: PrivateKeyAccount, accumulatedBlock: Block): Task[MicroblockMiningResult] = {
log.trace(s"Generating microBlock for $account")
val pc = allChannels.size()
val accumulatedCount = accumulatedBlock.transactionCount
lazy val unconfirmed = measureLog("packing unconfirmed transactions for microblock") {
utx.packUnconfirmed(settings.minerSettings.maxTransactionsInMicroBlock, sortInBlock = false)
val maxTxsForMicroblock = Math.min(MaxTransactionsPerBlockVer3 - accumulatedCount, settings.minerSettings.maxTransactionsInMicroBlock)
utx.packUnconfirmed(maxTxsForMicroblock, sortInBlock = false)
}
if (pc < minerSettings.quorum) {
if (accumulatedCount == MaxTransactionsPerBlockVer3) {
log.trace(s"Stopping forging microBlocks, block is already full")
Task.now(Stop)
} else if (pc < minerSettings.quorum) {
log.trace(s"Quorum not available ($pc/${minerSettings.quorum}, not forging microblock with ${account.address}")
Task.now(Right(None))
}
else if (unconfirmed.isEmpty) {
log.trace("skipping microBlock because no txs in utx pool")
Task.now(Right(None))
}
else {
Task.now(Retry)
} else if (unconfirmed.isEmpty) {
log.trace(s"Skipping microBlock because utx is empty")
Task.now(Retry)
} else {
log.trace(s"Accumulated ${unconfirmed.size} txs for microblock")
val start = System.currentTimeMillis()
(for {
signedBlock <- EitherT(Task.now(Block.buildAndSign(
signedBlock <- EitherT.fromEither[Task](Block.buildAndSign(
version = 3,
timestamp = accumulatedBlock.timestamp,
reference = accumulatedBlock.reference,
consensusData = accumulatedBlock.consensusData,
transactionData = accumulatedBlock.transactionData ++ unconfirmed,
signer = account,
featureVotes = accumulatedBlock.featureVotes
)))
microBlock <- EitherT(Task.now(MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature)))
))
microBlock <- EitherT.fromEither[Task](MicroBlock.buildAndSign(account, unconfirmed, accumulatedBlock.signerData.signature, signedBlock.signerData.signature))
_ = microBlockBuildTimeStats.safeRecord(System.currentTimeMillis() - start)
_ <- EitherT(MicroblockAppender(checkpoint, history, blockchainUpdater, utx)(microBlock))
} yield {
BlockStats.mined(microBlock)
log.trace(s"$microBlock has been mined for $account}")
allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig))
Some(signedBlock)
}).value.map {
_.left.map { err =>
log.trace(s"MicroBlock has NOT been mined for $account} because $err")
err
}
} yield (microBlock, signedBlock)).value map {
case Left(err) =>
Error(err)
case Right((microBlock, signedBlock)) =>
BlockStats.mined(microBlock)
log.trace(s"$microBlock has been mined for $account}")
allChannels.broadcast(MicroBlockInv(account, microBlock.totalResBlockSig, microBlock.prevResBlockSig))
Success(signedBlock)
}
}
}.flatten
}

private def generateMicroBlockSequence(account: PrivateKeyAccount, accumulatedBlock: Block, delay: FiniteDuration): Task[Unit] = {
debugState = MinerDebugInfo.MiningMicroblocks
generateOneMicroBlockTask(account, accumulatedBlock).delayExecution(delay).flatMap {
case Left(err) => Task {
case Error(e) => Task {
debugState = MinerDebugInfo.Error(e.toString)
log.warn("Error mining MicroBlock: " + e.toString)
}
case Success(newTotal) => generateMicroBlockSequence(account, newTotal, minerSettings.microBlockInterval)
case Retry => generateMicroBlockSequence(account, accumulatedBlock, minerSettings.microBlockInterval)
case Stop => Task {
debugState = MinerDebugInfo.MiningBlocks
log.warn("Error mining MicroBlock: " + err.toString)
log.debug("MicroBlock mining completed, block is full")
}
case Right(maybeNewTotal) => generateMicroBlockSequence(account, maybeNewTotal.getOrElse(accumulatedBlock), minerSettings.microBlockInterval)
}
}

Expand Down Expand Up @@ -254,4 +260,11 @@ object Miner {
val calculatedOffset = calculatedGenerationTimestamp - timeService.correctedTime()
Math.max(minimalBlockGenerationOffset.toMillis, calculatedOffset).millis
}

sealed trait MicroblockMiningResult
case object Stop extends MicroblockMiningResult
case object Retry extends MicroblockMiningResult
case class Error(e: ValidationError) extends MicroblockMiningResult
case class Success(b: Block) extends MicroblockMiningResult

}
8 changes: 5 additions & 3 deletions src/main/scala/com/wavesplatform/network/HistoryReplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class HistoryReplier(history: NgHistory, settings: SynchronizationSettings) exte
case Some((parent, extension)) =>
log.debug(s"${id(ctx)} Got GetSignatures with ${otherSigs.length}, found common parent $parent and sending ${extension.length} more signatures")
ctx.writeAndFlush(Signatures(parent +: extension))
case None if otherSigs.length == 1 && otherSigs.head == history.lastBlock.get.uniqueId =>
case None if otherSigs.lengthCompare(1) == 0 && otherSigs.head == history.lastBlock.get.uniqueId =>
// this is the special case when both nodes only have genesis block
log.debug(s"${id(ctx)} Both local and remote nodes only have genesis block")
ctx.writeAndFlush(Signatures(otherSigs))
Expand All @@ -52,13 +52,15 @@ class HistoryReplier(history: NgHistory, settings: SynchronizationSettings) exte

case GetBlock(sig) => Task(knownBlocks.get(sig)).map(bytes =>
ctx.writeAndFlush(RawBytes(BlockSpec.messageCode, bytes)))
.runAsyncLogErr
.logErrDiscardNoSuchElementException
.runAsync

case mbr@MicroBlockRequest(totalResBlockSig) =>
Task(knownMicroBlocks.get(totalResBlockSig)).map { bytes =>
ctx.writeAndFlush(RawBytes(MicroBlockResponseSpec.messageCode, bytes))
log.trace(id(ctx) + s"Sent MicroBlockResponse(total=${totalResBlockSig.trim})")
}.runAsyncLogErr
}.logErrDiscardNoSuchElementException
.runAsync

case _: Handshake => Task {
ctx.writeAndFlush(LocalScoreChanged(history.score()))
Expand Down
Loading

0 comments on commit 2118790

Please sign in to comment.