diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/BatcherSubmitter.java b/hildr-batcher/src/main/java/io/optimism/batcher/BatcherSubmitter.java index 5ba933e5..04728ce9 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/BatcherSubmitter.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/BatcherSubmitter.java @@ -17,13 +17,18 @@ package io.optimism.batcher; import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.optimism.batcher.channel.ChannelConfig; import io.optimism.batcher.channel.ChannelManager; +import io.optimism.batcher.compressor.CompressorConfig; import io.optimism.batcher.config.Config; import io.optimism.batcher.exception.BatcherExecutionException; import io.optimism.batcher.loader.BlockLoader; import io.optimism.batcher.loader.LoaderConfig; import io.optimism.batcher.publisher.ChannelDataPublisher; import io.optimism.batcher.publisher.PublisherConfig; +import io.optimism.type.BlockId; +import io.optimism.type.L1BlockRef; +import io.optimism.utilities.derive.stages.Frame; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.web3j.protocol.core.methods.response.TransactionReceipt; @@ -47,6 +52,8 @@ public class BatcherSubmitter extends AbstractExecutionThreadService { private volatile boolean isShutdownTriggered = false; + private L1BlockRef lastL1Tip; + /** * Constructor of BatcherSubmitter. * @@ -54,12 +61,13 @@ public class BatcherSubmitter extends AbstractExecutionThreadService { */ public BatcherSubmitter(Config config) { this.config = config; - this.channelManager = new ChannelManager(); + this.channelManager = + new ChannelManager(ChannelConfig.from(config), CompressorConfig.from(config)); this.blockLoader = new BlockLoader(LoaderConfig.from(config), this.channelManager::addL2Block); this.channelPublisher = new ChannelDataPublisher( - PublisherConfig.from(config, this.blockLoader.getRollConfig().batchInboxAddress()), + PublisherConfig.from(config, this.blockLoader.getRollConfig()), this.channelManager::txData, this::handleReceipt); } @@ -72,16 +80,17 @@ private void trySubmitBatchData() { Thread.sleep(config.pollInterval()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new BatcherExecutionException(e); + throw new BatcherExecutionException("Batcher thread has been interrupted", e); } } } - private void handleReceipt(TransactionReceipt receipt) { + private void handleReceipt(Frame tx, TransactionReceipt receipt) { if (receipt.isStatusOK()) { - // todo this.channelManager.txConfirmed(); + this.channelManager.txConfirmed( + tx, new BlockId(receipt.getBlockHash(), receipt.getBlockNumber())); } else { - // todo this.channelManager.txFailed(); + this.channelManager.txFailed(tx); } } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java b/hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java index 549c413f..6a1e882f 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java @@ -16,7 +16,13 @@ package io.optimism.batcher.channel; -import org.apache.commons.lang3.ArrayUtils; +import io.optimism.type.BlockId; +import io.optimism.type.L1BlockInfo; +import io.optimism.utilities.derive.stages.Frame; +import java.io.Closeable; +import java.math.BigInteger; +import java.util.List; +import org.web3j.protocol.core.methods.response.EthBlock; /** * Channel interface. cache batch submit data. @@ -24,26 +30,118 @@ * @author thinkAfCod * @since 0.1.1 */ -public interface Channel { +public interface Channel extends Closeable { - /** Derivation version. */ - byte DERIVATION_VERSION_0 = 0; + /** + * Add block to channel, block will be parsed and be compressed. + * + * @param block Block on L2 + * @return l1 block info + */ + L1BlockInfo addBlock(EthBlock.Block block); + + /** Split channel data to frames. */ + void splitToFrame(); + + /** + * Get next frame data should be published to l1. The data added to the channel will be + * transformed into multiple frames. + * + * @return TxData instance contains frame data + */ + Frame nextFrame(); + + /** + * Push frame to current channel. + * + * @param frame TxData instance + */ + void pushFrame(Frame frame); + + /** + * Get Total frames. + * + * @return total value. + */ + int totalFrames(); + + /** + * Get count of pending frames. + * + * @return count of pending frames + */ + int pendingFrames(); + + /** + * If has frame. + * + * @return true if has data of frame, otherwise false. + */ + boolean hasFrame(); + + /** + * Has none pending tx data. + * + * @return true if has none pending tx data, otherwise false. + */ + boolean noneSubmitted(); + + /** + * Check is tx data fully submitted. + * + * @return ture if fully submitted, otherwise false. + */ + boolean isFullySubmitted(); + + /** + * Process failed tx that belong to the channel. Will push tx back to pending queue. + * + * @param tx failed tx data + */ + void txFailed(Frame tx); + + /** + * Process confirmed tx that belong to the channel. + * + * @param tx confirmed tx data + * @param inclusionBlock tx data be inclusion block number + * @return if channel was timeout, the blocks added to the channel will be returned. + */ + List txConfirmed(Frame tx, BlockId inclusionBlock); + + /** + * If channel touch limit of frame data. + * + * @return true if full of data, otherwise false. + */ + boolean isFull(); + + /** + * If the channel data expired at the specified block height. + * + * @param blockNumber block height number + * @return true if timeout,otherwise false. + */ + boolean isTimeout(BigInteger blockNumber); + + /** + * Update channel data expired at the specified block height. + * + * @param blockNumber block height number + */ + void updateTimeout(final BigInteger blockNumber); + + /** + * Input bytes data. + * + * @return input bytes + */ + int inputBytesLength(); /** - * Channel Tx Data class. + * Ready to publishing bytes. * - * @param data L2 block data that will send to L1 - * @param channelId channelId - * @param frameNumber channel frame number + * @return ready to publishing bytes */ - record TxData(byte[] data, byte[] channelId, int frameNumber) { - /** - * Get tx bytes. - * - * @return tx bytes - */ - public byte[] txBytes() { - return ArrayUtils.addAll(new byte[] {DERIVATION_VERSION_0}, data()); - } - } + int readyBytesLength(); } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelConfig.java b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelConfig.java index d866e0e7..dd42cbca 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelConfig.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelConfig.java @@ -23,12 +23,19 @@ * * @param channelTimeout The maximum number of L1 blocks that the inclusion transactions of a * channel's frames can span. - * @param maxChannelDuration If 0, duration checks are disabled. + * @param maxChannelDuration Timeout of max block number.If 0, duration checks are disabled. * @param maxFrameSize The maximum byte-size a frame can have. + * @param seqWindowSize The maximum byte-size a frame can have. + * @param subSafetyMargin The maximum byte-size a frame can have. * @author thinkAfCod * @since 0.1.1 */ -public record ChannelConfig(long channelTimeout, long maxChannelDuration, int maxFrameSize) { +public record ChannelConfig( + long channelTimeout, + long maxChannelDuration, + int maxFrameSize, + long seqWindowSize, + long subSafetyMargin) { /** * Create a ChannelConfig instance from Config instance. @@ -37,6 +44,6 @@ public record ChannelConfig(long channelTimeout, long maxChannelDuration, int ma * @return ChannelConfig instance */ public static ChannelConfig from(Config config) { - return new ChannelConfig(30000, 0, 120_000); + return new ChannelConfig(30000, 0, 120_000, 3600, 10); } } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelImpl.java b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelImpl.java index 3425b20d..d5b1cd85 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelImpl.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelImpl.java @@ -16,6 +16,28 @@ package io.optimism.batcher.channel; +import io.optimism.batcher.compressor.Compressor; +import io.optimism.batcher.exception.UnsupportedException; +import io.optimism.type.BlockId; +import io.optimism.type.L1BlockInfo; +import io.optimism.utilities.derive.stages.Batch; +import io.optimism.utilities.derive.stages.Frame; +import java.io.IOException; +import java.math.BigInteger; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.web3j.protocol.core.methods.response.EthBlock; +import org.web3j.tuples.generated.Tuple2; +import org.web3j.utils.Numeric; + /** * Channel class.Record the batcher data of block transaction and process this data with framing. * @@ -24,6 +46,359 @@ */ public class ChannelImpl implements Channel { - /** Constructor of ChannelImpl. */ - public ChannelImpl() {} + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelImpl.class); + + private static final int MAX_UNSIGNED_SHORT = (1 << 16) - 1; + + private static final String DEPOSIT_TX_TYPE = "0x7E"; + + private static final int CH_ID_LEN = 16; + + private static final int MAX_RLP_BYTES_PER_CHANNEL = 10_000_000; + + private final ChannelConfig chConfig; + + private final BigInteger seqWindowTimeout; + + private final BigInteger id; + + private AtomicInteger frameNumber; + + private List outputFrames; + + private Map pendingTxs; + + private Map confirmedTxs; + + private List blocks; + + private BigInteger timeoutBlock; + + private Compressor compressor; + + private AtomicInteger rlpLength; + + private volatile boolean isFull; + + private volatile boolean isClose; + + /** + * Constructor of ChannelImpl. + * + * @param chConfig channel config + * @param compressor block data compressor + */ + public ChannelImpl(ChannelConfig chConfig, Compressor compressor) { + this.chConfig = chConfig; + this.seqWindowTimeout = + BigInteger.valueOf(chConfig.seqWindowSize() - chConfig.subSafetyMargin()); + this.compressor = compressor; + try { + var chIdBytes = new byte[CH_ID_LEN]; + SecureRandom.getInstanceStrong().nextBytes(chIdBytes); + this.id = Numeric.toBigInt(chIdBytes); + } catch (NoSuchAlgorithmException e) { + throw new UnsupportedException(e); + } + this.blocks = new ArrayList<>(); + this.outputFrames = new ArrayList<>(); + this.pendingTxs = new HashMap<>(); + this.confirmedTxs = new HashMap<>(); + this.rlpLength = new AtomicInteger(); + this.frameNumber = new AtomicInteger(); + this.isFull = false; + this.isClose = false; + } + + @Override + public L1BlockInfo addBlock(final EthBlock.Block block) { + if (this.isFull()) { + throw new ChannelFullException("this channel has been full of block data"); + } + if (this.isClose) { + throw new ChannelException("channel already closed"); + } + final Tuple2 l1InfoAndBatch = this.blockToBatch(block); + final L1BlockInfo l1Info = l1InfoAndBatch.component1(); + final Batch batch = l1InfoAndBatch.component2(); + try { + this.addBatch(batch); + this.blocks.add(block); + } catch (ChannelFullException e) { + this.isFull = true; + } + this.updateSeqWindowTimeout(batch); + return l1Info; + } + + @Override + public void splitToFrame() { + if (this.isFull()) { + this.closeAndOutputAllFrames(); + return; + } + this.outputReadyFrames(); + } + + @Override + public Frame nextFrame() { + if (this.outputFrames.size() == 0) { + throw new ChannelException("not next frame"); + } + var tx = this.outputFrames.remove(0); + this.pendingTxs.put(tx.code(), tx); + return tx; + } + + @Override + public void pushFrame(Frame frame) { + if (frame.channelId().equals(this.id)) { + throw new ChannelException("wrong channel"); + } + this.outputFrames.add(frame); + } + + @Override + public int totalFrames() { + return this.frameNumber.get() + 1; + } + + @Override + public int pendingFrames() { + return this.outputFrames.size(); + } + + @Override + public boolean hasFrame() { + return this.outputFrames.size() > 0; + } + + @Override + public void txFailed(Frame tx) { + // todo metrics record batch tx failed. + var code = tx.code(); + if (!this.pendingTxs.containsKey(code)) { + LOGGER.warn( + "unkown tx marked as failed: chId :{}; frameNum: {}", tx.channelId(), tx.frameNumber()); + return; + } + LOGGER.trace( + "marked transaction as failed: chId :{}; frameNum: {}", tx.channelId(), tx.frameNumber()); + this.pushFrame(tx); + this.pendingTxs.remove(code); + } + + @Override + public List txConfirmed(Frame tx, BlockId inclusionBlock) { + // todo metrics RecordBatchTxSubmitted + LOGGER.debug( + "marked tx as confirmed: chId: {}; frameNum: {}; block: {}", + tx.channelId(), + tx.frameNumber(), + inclusionBlock); + var code = tx.code(); + if (!this.pendingTxs.containsKey(code)) { + LOGGER.warn( + "unknown transaction marked as confirmed: chId: {}; frameNum: {}; block: {}", + tx.channelId(), + tx.frameNumber(), + inclusionBlock); + return null; + } + this.pendingTxs.remove(code); + this.confirmedTxs.put(code, inclusionBlock); + var timeout = + inclusionBlock + .number() + .add(BigInteger.valueOf(chConfig.channelTimeout())) + .subtract(BigInteger.valueOf(chConfig.subSafetyMargin())); + this.updateTimeout(timeout); + if (this.isTimeout()) { + // todo metrics recordChannelTimeout + LOGGER.warn("Channel timeout: chId:{}", tx.channelId()); + return this.blocks; + } + if (this.isFullySubmitted()) { + // todo metrics RecordChannelFullySubmitted + LOGGER.info("Channel is fully submitted: chId:{}", tx.channelId()); + } + return null; + } + + @Override + public boolean isFull() { + return this.isFull; + } + + @Override + public boolean noneSubmitted() { + return this.confirmedTxs.size() == 0 && this.pendingTxs.size() == 0; + } + + @Override + public boolean isFullySubmitted() { + return this.isFull() && (this.pendingTxs.size() + this.pendingFrames() == 0); + } + + @Override + public boolean isTimeout(BigInteger blockNumber) { + return this.timeoutBlock.equals(blockNumber); + } + + private boolean isTimeout() { + if (this.confirmedTxs.size() == 0) { + return false; + } + var min = BigInteger.valueOf(Long.MAX_VALUE); + var max = BigInteger.ZERO; + Collection inclusionBlockIds = this.confirmedTxs.values(); + for (BlockId inclusionBlockId : inclusionBlockIds) { + var inclusionBlockNumber = inclusionBlockId.number(); + if (inclusionBlockNumber.compareTo(min) < 0) { + min = inclusionBlockNumber; + } + if (inclusionBlockNumber.compareTo(max) > 0) { + max = inclusionBlockNumber; + } + } + return max.subtract(min).compareTo(BigInteger.valueOf(this.chConfig.channelTimeout())) >= 0; + } + + /** + * update channel data timeout block number. + * + * @param blockNumber block height number + */ + @Override + public void updateTimeout(final BigInteger blockNumber) { + if (this.timeoutBlock == null || this.timeoutBlock.compareTo(blockNumber) > 0) { + this.timeoutBlock = blockNumber; + } + } + + @Override + public int inputBytesLength() { + return this.rlpLength.get(); + } + + @Override + public int readyBytesLength() { + return this.compressor.length(); + } + + @Override + public void close() { + if (this.isClose) { + throw new ChannelException("channel has been closed"); + } + this.isClose = true; + try { + this.compressor.close(); + } catch (IOException e) { + throw new ChannelException("compressor closed failed", e); + } + } + + private Tuple2 blockToBatch(EthBlock.Block block) { + final List blockTxs = block.getTransactions(); + if (blockTxs == null || blockTxs.size() == 0) { + throw new ChannelException(String.format("block %s has no transations", block.getHash())); + } + final EthBlock.TransactionObject depositTxObj = (EthBlock.TransactionObject) blockTxs.get(0); + if (!DEPOSIT_TX_TYPE.equalsIgnoreCase(depositTxObj.getType())) { + throw new ChannelException("block txs not contains deposit tx"); + } + final L1BlockInfo l1Info = + L1BlockInfo.from(Numeric.hexStringToByteArray(depositTxObj.getInput())); + + final List txDataList = new ArrayList<>(blockTxs.size()); + for (int i = 1; i < blockTxs.size(); i++) { + final EthBlock.TransactionObject txObj = (EthBlock.TransactionObject) blockTxs.get(i); + if (DEPOSIT_TX_TYPE.equalsIgnoreCase(txObj.getType())) { + continue; + } + txDataList.add(txObj.getInput()); + } + return new Tuple2( + l1Info, + new Batch( + block.getParentHash(), + l1Info.number(), + l1Info.blockHash(), + block.getTimestamp(), + txDataList, + null)); + } + + private int addBatch(Batch batch) { + if (this.isClose) { + throw new ChannelException("channel already closed"); + } + byte[] encode = batch.encode(); + if ((this.rlpLength.get() + encode.length) > MAX_RLP_BYTES_PER_CHANNEL) { + throw new ChannelFullException( + String.format( + "could not add %d bytes to channel of %d bytes, max is %d", + encode.length, this.rlpLength.get(), MAX_RLP_BYTES_PER_CHANNEL)); + } + this.rlpLength.addAndGet(encode.length); + return this.compressor.write(encode); + } + + private void closeAndOutputAllFrames() { + this.close(); + boolean isLastFrame = false; + while (!isLastFrame) { + isLastFrame = createFrame(); + } + } + + private void outputReadyFrames() { + while (this.readyBytesLength() >= this.chConfig.maxFrameSize()) { + boolean isLastFrame = this.createFrame(); + if (isLastFrame) { + break; + } + } + } + + private boolean createFrame() { + var frame = this.frame(this.chConfig.maxFrameSize()); + if (frame.frameNumber() == MAX_UNSIGNED_SHORT) { + this.isFull = true; + } + this.outputFrames.add(frame); + // todo numFrames++ + // todo outputBytes += len(frame.data) + return frame.isLastFrame(); + } + + private Frame frame(final int maxSize) { + if (maxSize < Frame.FRAME_V0_OVER_HEAD_SIZE) { + throw new ChannelException("maxSize is too small to fit the fixed frame overhead"); + } + var lastFrameFlag = false; + var dataSize = maxSize - Frame.FRAME_V0_OVER_HEAD_SIZE; + if (dataSize > this.compressor.length()) { + dataSize = this.compressor.length(); + lastFrameFlag = this.isClose; + } + + byte[] data = new byte[dataSize]; + int read = this.compressor.read(data); + if (read != data.length) { + read = read == -1 ? 0 : read; + byte[] temp = new byte[read]; + System.arraycopy(data, 0, temp, 0, read); + data = temp; + } + var frame = Frame.create(this.id, this.frameNumber.get(), data, lastFrameFlag); + this.frameNumber.addAndGet(1); + return frame; + } + + private void updateSeqWindowTimeout(final Batch batch) { + var timeout = batch.epochNum().add(this.seqWindowTimeout); + this.updateTimeout(timeout); + } } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelManager.java b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelManager.java index 2625fd6e..27086eb4 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelManager.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/channel/ChannelManager.java @@ -16,10 +16,21 @@ package io.optimism.batcher.channel; +import io.optimism.batcher.compressor.CompressorConfig; +import io.optimism.batcher.compressor.Compressors; import io.optimism.type.BlockId; +import io.optimism.type.L1BlockInfo; +import io.optimism.type.L2BlockRef; +import io.optimism.utilities.derive.stages.Frame; +import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.web3j.protocol.core.methods.response.EthBlock; /** @@ -30,13 +41,37 @@ */ public class ChannelManager { + private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class); + + private final ChannelConfig chConfig; + + private final CompressorConfig compressorConfig; + private List blocks; + private final List channels; + private String latestBlockHash; - /** Constructor of ChannelManager. */ - public ChannelManager() { + private volatile boolean isClosed; + + private Channel latestChannel; + + private Map txChMap; + + /** + * Constructor of ChannelManager. + * + * @param chConfig channel config + * @param compressorConfig compressor config + */ + public ChannelManager(final ChannelConfig chConfig, final CompressorConfig compressorConfig) { + this.chConfig = chConfig; + this.compressorConfig = compressorConfig; this.blocks = new ArrayList<>(256); + this.channels = new ArrayList<>(256); + this.isClosed = false; + this.txChMap = new HashMap<>(); } /** @@ -65,28 +100,173 @@ public void addL2Block(EthBlock.Block block) { * @param l1Head l1 head block id * @return The next tx data that should be submitted to L1. */ - public Channel.TxData txData(BlockId l1Head) { - return null; + public Frame txData(final BlockId l1Head) { + Channel framesSource = null; + for (Channel channel : channels) { + if (channel.hasFrame()) { + framesSource = channel; + break; + } + } + if (framesSource != null || this.isClosed) { + return this.nextFrameData(framesSource); + } + // no channel + if (!this.hasSpace(this.latestChannel)) { + this.latestChannel = this.openChannel(l1Head); + } + this.pushBlocks(this.latestChannel); + this.updateChannelTimeout(l1Head); + + return this.nextFrameData(this.latestChannel); } /** * Records a transaction as failed. It will attempt to resubmit the data in the failed * transaction. * - * @param txId channel tx id + * @param tx channel tx */ - public void txFailed(Channel.TxData txId) {} + public void txFailed(final Frame tx) { + var code = tx.code(); + if (!this.txChMap.containsKey(code)) { + LOGGER.warn("transaction from unkown channel marked as failed: id: {}", tx.channelId()); + return; + } + Channel ch = this.txChMap.remove(tx.code()); + ch.txFailed(tx); + if (!this.isClosed || !ch.noneSubmitted()) { + return; + } + LOGGER.info( + "Channel has no submitted transactions, clearing for shutdown: chId: {}", tx.channelId()); + this.channels.remove(ch); + if (this.latestChannel.equals(ch)) { + this.latestChannel = null; + } + } /** * Marks a transaction as confirmed on L1. Unfortunately even if all frames in a channel have been * marked as confirmed on L1 the channel may be invalid and need to be resubmitted. This function * may reset the pending channel if the pending channel has timed out. * - * @param txId channel tx id + * @param tx channel tx * @param inclusionBlock inclusion block id */ - public void txConfirmed(Channel.TxData txId, BlockId inclusionBlock) {} + public void txConfirmed(final Frame tx, final BlockId inclusionBlock) { + // todo metrics RecordBatchTxSubmitted + LOGGER.debug( + "marked transaction as confirmed: chId: {}; frameNum: {};block: {}", + tx.channelId(), + tx.frameNumber(), + inclusionBlock.number()); + var code = tx.code(); + if (!this.txChMap.containsKey(code)) { + LOGGER.warn( + "transaction from unknown channel marked as confirmed: chId: {}; frameNum: {};block: {}", + tx.channelId(), + tx.frameNumber(), + inclusionBlock.number()); + return; + } + final var ch = this.txChMap.remove(code); + List blocks = ch.txConfirmed(tx, inclusionBlock); + if (blocks != null && blocks.size() > 0) { + this.blocks.addAll(blocks); + } + if (!ch.isFullySubmitted()) { + return; + } + this.channels.remove(ch); + if (this.latestChannel.equals(ch)) { + this.latestChannel = null; + } + } - /** Clear blocks and channels that have not entered the pending state. */ - public void clear() {} + /** Close channel manager. */ + public void close() { + if (!isClosed) { + this.isClosed = true; + } else { + throw new ChannelException("channel manager has been closed"); + } + } + + /** + * Clears the entire state of the channel manager. It is intended to be used after an L2 reorg. + */ + public void clear() { + LOGGER.trace("clearing channel manager state"); + this.blocks.clear(); + this.isClosed = false; + this.latestChannel = null; + this.channels.clear(); + this.txChMap.clear(); + } + + private Frame nextFrameData(final Channel ch) { + if (ch == null || !ch.hasFrame()) { + return null; + } + var txData = ch.nextFrame(); + this.txChMap.put(txData.code(), ch); + return txData; + } + + private boolean hasSpace(final Channel channel) { + return channel != null && !channel.isFull(); + } + + private Channel openChannel(final BlockId l1Head) { + Channel ch = new ChannelImpl(this.chConfig, Compressors.create(this.compressorConfig)); + LOGGER.info( + "Created a channel: id:{}, l1Head: {}, blocksPending:{}", ch, l1Head, this.blocks.size()); + // todo metrics record opened channel + return ch; + } + + private void pushBlocks(final Channel lastChannel) { + int blocksAdded = 0; + L2BlockRef unused = null; + try { + for (final EthBlock.Block block : this.blocks) { + final L1BlockInfo l1Info = lastChannel.addBlock(block); + unused = L2BlockRef.fromBlockAndL1Info(block, l1Info); + // todo metrics recordL2BlockInChannel + if (latestChannel.isFull()) { + break; + } + blocksAdded += 1; + } + } catch (ChannelException e) { + if (!(e instanceof ChannelFullException)) { + LOGGER.error( + "adding block[{}] to channel failed", this.blocks.get(blocksAdded).getNumber(), e); + } + } + + if (blocksAdded == this.blocks.size()) { + this.blocks.clear(); + } else { + this.blocks = this.blocks.stream().skip(blocksAdded).collect(Collectors.toList()); + } + + // todo metrics RecordL2BlocksAdded + + LOGGER.debug( + "Added blocks to channel:" + + " blocksAdded: {}, blocksPending: {}," + + " channelFull: {}, inputBytes: {}, readyBytes: {}", + blocksAdded, + this.blocks.size(), + this.latestChannel.isFull(), + this.latestChannel.inputBytesLength(), + this.latestChannel.readyBytesLength()); + } + + private void updateChannelTimeout(BlockId l1Head) { + this.latestChannel.updateTimeout( + l1Head.number().add(BigInteger.valueOf(chConfig.maxChannelDuration()))); + } } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/loader/BlockLoader.java b/hildr-batcher/src/main/java/io/optimism/batcher/loader/BlockLoader.java index b4e967ac..531c55a8 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/loader/BlockLoader.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/loader/BlockLoader.java @@ -129,56 +129,59 @@ private void loadBlocksIntoState() { final BigInteger start = blockNumbers.component1().number(); final BigInteger end = blockNumbers.component2(); var stopBlock = end.add(BigInteger.ONE); - EthBlock.Block lastestBlock = null; + EthBlock.Block lastBlock = null; for (BigInteger i = start.add(BigInteger.ONE); i.compareTo(stopBlock) < 0; i = i.add(BigInteger.ONE)) { EthBlock.Block block = this.loadBlockToChannel(i); this.latestLoadedBlock = BlockId.from(block); - lastestBlock = block; + lastBlock = block; } - if (lastestBlock == null) { + if (lastBlock == null) { throw new BlockLoaderException("get latest block failed"); } - var ignore = l2BlockToBlockRef(lastestBlock, rollupConfig.genesis()); + var ignore = l2BlockToBlockRef(lastBlock, rollupConfig.genesis()); // todo metrics.RecordL2BlocksLoaded l2Ref } private Tuple2 calculateL2BlockRangeToStore() { final Request req = new Request<>(OP_SYNC_STATUS, List.of(), this.rollupService, OpEthSyncStatusRes.class); + OpEthSyncStatusRes.OpEthSyncStatus syncStatus = null; try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { var future = scope.fork(req::send); scope.join(); scope.throwIfFailed(); - var syncStatus = future.resultNow().getOpEthSyncStatus(); - if (syncStatus.headL1().equals(L1BlockRef.emptyBlock)) { - throw new SyncStatusException("empty sync status"); - } - if (latestLoadedBlock == null || latestLoadedBlock.number().compareTo(BigInteger.ZERO) == 0) { - LOGGER.info("Starting batch-submitter work at L2 safe-head: {}", syncStatus.safeL2()); - latestLoadedBlock = syncStatus.safeL2().toId(); - } else if (latestLoadedBlock.number().compareTo(syncStatus.safeL2().number()) <= 0) { - LOGGER.warn( - "last submitted block lagged behind L2 safe head: batch submission will continue"); - latestLoadedBlock = syncStatus.safeL2().toId(); - } + syncStatus = future.resultNow().getOpEthSyncStatus(); + } catch (ExecutionException e) { + throw new Web3jCallException("StructuredTaskScope execute syncStatus failed:", e); + } catch (InterruptedException e) { + throw new Web3jCallException( + "Thread has been interrupted while calling calculateL2BlockRangeToStore:", e); + } + if (syncStatus.headL1().equals(L1BlockRef.emptyBlock)) { + throw new SyncStatusException("empty sync status"); + } + if (latestLoadedBlock == null || latestLoadedBlock.number().equals(BigInteger.ZERO)) { + LOGGER.info("Starting batch-submitter work at L2 safe-head: {}", syncStatus.safeL2()); + latestLoadedBlock = syncStatus.safeL2().toId(); + } else if (latestLoadedBlock.number().compareTo(syncStatus.safeL2().number()) <= 0) { + LOGGER.warn( + "last submitted block lagged behind L2 safe head: batch submission will continue"); + latestLoadedBlock = syncStatus.safeL2().toId(); + } - if (syncStatus.safeL2().number().compareTo(syncStatus.unsafeL2().number()) >= 0 - || latestLoadedBlock.number().compareTo(syncStatus.unsafeL2().number()) >= 0) { - throw new SyncStatusException("L2 safe head ahead of L2 unsafe head"); - } - return new Tuple2<>(latestLoadedBlock, syncStatus.unsafeL2().number()); - } catch (ExecutionException | InterruptedException e) { - Thread.currentThread().interrupt(); - throw new Web3jCallException("failed to get syncStatus", e); + if (syncStatus.safeL2().number().compareTo(syncStatus.unsafeL2().number()) >= 0 + || latestLoadedBlock.number().compareTo(syncStatus.unsafeL2().number()) >= 0) { + throw new SyncStatusException("L2 safe head ahead of L2 unsafe head"); } + return new Tuple2<>(latestLoadedBlock, syncStatus.unsafeL2().number()); } private L2BlockRef l2BlockToBlockRef(final EthBlock.Block block, Genesis genesis) { BlockId l1Origin = null; BigInteger sequenceNumber = null; - if (block.getNumber().compareTo(genesis.l2().number()) == 0) { + if (block.getNumber().equals(genesis.l2().number())) { if (!block.getHash().equals(genesis.l2().hash())) { throw new BlockLoaderException( String.format( diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/publisher/ChannelDataPublisher.java b/hildr-batcher/src/main/java/io/optimism/batcher/publisher/ChannelDataPublisher.java index 2a464dd4..2c1d1472 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/publisher/ChannelDataPublisher.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/publisher/ChannelDataPublisher.java @@ -16,29 +16,34 @@ package io.optimism.batcher.publisher; -import io.optimism.batcher.channel.Channel; import io.optimism.batcher.exception.Web3jCallException; import io.optimism.type.BlockId; import io.optimism.type.L1BlockRef; -import io.optimism.type.TxCandidate; +import io.optimism.utilities.derive.stages.Frame; import io.optimism.utilities.gas.GasCalculator; import io.optimism.utilities.rpc.Web3jProvider; import io.optimism.utilities.telemetry.TracerTaskWrapper; import java.io.Closeable; +import java.math.BigInteger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Function; import jdk.incubator.concurrent.StructuredTaskScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.web3j.crypto.Credentials; +import org.web3j.crypto.RawTransaction; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameterName; +import org.web3j.protocol.core.methods.request.Transaction; import org.web3j.protocol.core.methods.response.EthBlock; +import org.web3j.protocol.core.methods.response.EthEstimateGas; +import org.web3j.protocol.core.methods.response.EthGetTransactionCount; import org.web3j.protocol.core.methods.response.EthGetTransactionReceipt; -import org.web3j.protocol.core.methods.response.EthSendTransaction; import org.web3j.protocol.core.methods.response.TransactionReceipt; import org.web3j.tx.RawTransactionManager; +import org.web3j.utils.Numeric; /** * ChannelDataPublisher class. It will get tx data from channelManager and push it to L1. @@ -53,16 +58,20 @@ public class ChannelDataPublisher implements Closeable { private final PublisherConfig config; + private final String fromAddress; + private final Web3j l1Client; private final RawTransactionManager txManager; - private final Function dataSupplier; + private final Function dataSupplier; - private final Consumer txReceiptReturn; + private final BiConsumer txReceiptReturn; private L1BlockRef lastL1Tip; + private BigInteger nonce; + /** * Constructor of ChannelDataPublisher. * @@ -72,11 +81,14 @@ public class ChannelDataPublisher implements Closeable { */ public ChannelDataPublisher( PublisherConfig config, - Function dataSupplier, - Consumer txReceiptReturn) { + Function dataSupplier, + BiConsumer txReceiptReturn) { this.config = config; this.l1Client = Web3jProvider.createClient(config.l1RpcUrl()); - this.txManager = new RawTransactionManager(l1Client, null); + var credentials = Credentials.create(config.l1Signer()); + this.fromAddress = credentials.getAddress(); + this.txManager = + new RawTransactionManager(l1Client, credentials, config.l1chainId().longValue()); this.dataSupplier = dataSupplier; this.txReceiptReturn = txReceiptReturn; } @@ -106,7 +118,7 @@ public void close() { private boolean publishTxToL1() { final L1BlockRef l1HeadBlockRef = getL1HeadBlockRef(); this.recordL1Head(l1HeadBlockRef); - Channel.TxData txData = dataSupplier.apply(l1HeadBlockRef.toId()); + var txData = dataSupplier.apply(l1HeadBlockRef.toId()); if (txData == null) { LOGGER.trace("no transaction data available"); throw new NoDataPublishException("no transaction data available"); @@ -115,16 +127,33 @@ private boolean publishTxToL1() { return true; } - private void sendTx(Channel.TxData txData) { - byte[] txBytes = txData.txBytes(); + private void sendTx(final Frame txData) { + final String txBytes = Numeric.toHexString(txData.txBytes()); + final String to = this.config.batchInboxAddress(); long intrinsicGas = - GasCalculator.intrinsicGasWithoutAccessList(txBytes, false, true, true, false); - var txCandidate = new TxCandidate(txBytes, this.config.batchInboxAddress(), intrinsicGas); - EthSendTransaction ethSendTransaction = null; - String txHash = ethSendTransaction.getTransactionHash(); + GasCalculator.intrinsicGasWithoutAccessList(txData.txBytes(), false, true, true, false); + + var maxPriorityFeePerGas = this.getMaxPriorityFeePerGas(); + var baseFee = this.getBaseFee(); + var gasFeeCap = GasCalculator.calcGasFeeCap(baseFee, maxPriorityFeePerGas); + var gasLimit = + intrinsicGas == 0L + ? this.getEstimateGas(this.config.batchInboxAddress(), txBytes) + : BigInteger.valueOf(intrinsicGas); + + var rawTx = + RawTransaction.createTransaction( + this.config.l1chainId().longValue(), + this.nextNonce(), + gasLimit, + to, + BigInteger.ZERO, + txBytes, + maxPriorityFeePerGas, + gasFeeCap); + String txHash = this.signAndSend(rawTx); var txReceipt = this.getTxReceipt(txHash); - txReceiptReturn.accept(txReceipt.getTransactionReceipt().get()); - // todo use txManager send tx + txReceiptReturn.accept(txData, txReceipt.getTransactionReceipt().get()); } private EthGetTransactionReceipt getTxReceipt(final String txHash) { @@ -169,4 +198,100 @@ private void recordL1Head(L1BlockRef headRef) { this.lastL1Tip = headRef; // todo metrics LatestL1Block } + + private synchronized BigInteger nextNonce() { + if (this.nonce != null) { + this.nonce = this.nonce.add(BigInteger.ONE); + return this.nonce; + } + EthGetTransactionCount txCount = null; + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var countFuture = + scope.fork( + TracerTaskWrapper.wrap( + () -> { + var countReq = + this.l1Client.ethGetTransactionCount( + this.fromAddress, DefaultBlockParameterName.LATEST); + return countReq.send(); + })); + scope.join(); + scope.throwIfFailed(); + txCount = countFuture.resultNow(); + this.nonce = txCount.getTransactionCount(); + return this.nonce; + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new Web3jCallException("get tx count failed", e); + } + } + + private BigInteger getMaxPriorityFeePerGas() { + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var gasFuture = + scope.fork( + TracerTaskWrapper.wrap( + () -> { + var countReq = this.l1Client.ethMaxPriorityFeePerGas(); + return countReq.send(); + })); + scope.join(); + scope.throwIfFailed(); + return gasFuture.resultNow().getMaxPriorityFeePerGas(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new Web3jCallException("get max priority fee gas failed", e); + } + } + + private BigInteger getBaseFee() { + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var gasFuture = + scope.fork( + TracerTaskWrapper.wrap( + () -> { + var headReq = + this.l1Client.ethGetBlockByNumber(DefaultBlockParameterName.LATEST, false); + return headReq.send(); + })); + scope.join(); + scope.throwIfFailed(); + return gasFuture.resultNow().getBlock().getBaseFeePerGas(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new Web3jCallException("get l1 head block failed", e); + } + } + + private BigInteger getEstimateGas(final String to, final String data) { + EthEstimateGas gas = null; + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var gasFuture = + scope.fork( + TracerTaskWrapper.wrap( + () -> { + final Transaction txParam = + new Transaction(this.fromAddress, null, null, null, to, null, data); + return this.l1Client.ethEstimateGas(txParam).send(); + })); + scope.join(); + scope.throwIfFailed(); + return gasFuture.resultNow().getAmountUsed(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new Web3jCallException("get tx count failed", e); + } + } + + private String signAndSend(final RawTransaction rawTx) { + try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { + var txResultFuture = scope.fork(TracerTaskWrapper.wrap(() -> txManager.signAndSend(rawTx))); + scope.join(); + scope.throwIfFailed(); + return txResultFuture.resultNow().getTransactionHash(); + } catch (InterruptedException | ExecutionException e) { + Thread.currentThread().interrupt(); + throw new Web3jCallException("sign and send tx failed", e); + } + } } diff --git a/hildr-batcher/src/main/java/io/optimism/batcher/publisher/PublisherConfig.java b/hildr-batcher/src/main/java/io/optimism/batcher/publisher/PublisherConfig.java index 94dbbd0c..25bb1a48 100644 --- a/hildr-batcher/src/main/java/io/optimism/batcher/publisher/PublisherConfig.java +++ b/hildr-batcher/src/main/java/io/optimism/batcher/publisher/PublisherConfig.java @@ -17,26 +17,34 @@ package io.optimism.batcher.publisher; import io.optimism.batcher.config.Config; +import io.optimism.type.RollupConfigRes; +import java.math.BigInteger; /** * Publisher Config class. * * @param l1RpcUrl L1 rpc url * @param l1Signer L1 signer private key + * @param l1chainId L1 chain id * @param batchInboxAddress Address of BatchInboxContract on L1 * @author thinkAfCod * @since 0.1.1 */ -public record PublisherConfig(String l1RpcUrl, String l1Signer, String batchInboxAddress) { +public record PublisherConfig( + String l1RpcUrl, String l1Signer, BigInteger l1chainId, String batchInboxAddress) { /** * Create a PublisherConfig instance from Config instance. * * @param config Config instance - * @param batchInboxAddress address of BatchInboxContract + * @param rollupConfig Rollup config, get from rollup node api * @return PublisherConfig instance */ - public static PublisherConfig from(Config config, String batchInboxAddress) { - return new PublisherConfig(config.l1RpcUrl(), config.l1Signer(), batchInboxAddress); + public static PublisherConfig from(Config config, RollupConfigRes.RollupConfig rollupConfig) { + return new PublisherConfig( + config.l1RpcUrl(), + config.l1Signer(), + rollupConfig.l1ChainId(), + rollupConfig.batchInboxAddress()); } } diff --git a/hildr-batcher/src/main/java/io/optimism/type/BlockId.java b/hildr-batcher/src/main/java/io/optimism/type/BlockId.java index 19deade3..955e3707 100644 --- a/hildr-batcher/src/main/java/io/optimism/type/BlockId.java +++ b/hildr-batcher/src/main/java/io/optimism/type/BlockId.java @@ -38,4 +38,9 @@ public record BlockId(String hash, BigInteger number) { public static BlockId from(EthBlock.Block block) { return new BlockId(block.getHash(), block.getNumber()); } + + @Override + public String toString() { + return "BlockId{" + "hash='" + hash + '\'' + ", number=" + number + '}'; + } } diff --git a/hildr-batcher/src/main/java/io/optimism/type/L1BlockInfo.java b/hildr-batcher/src/main/java/io/optimism/type/L1BlockInfo.java index 6ab01040..642d5615 100644 --- a/hildr-batcher/src/main/java/io/optimism/type/L1BlockInfo.java +++ b/hildr-batcher/src/main/java/io/optimism/type/L1BlockInfo.java @@ -69,7 +69,7 @@ public static L1BlockInfo from(byte[] data) { String.format("data is unexpected length: %d", data == null ? 0 : data.length)); } if (!Objects.deepEquals(ArrayUtils.subarray(data, 0, 4), SIGNATURE_BYTES)) { - throw new ParseBlockException(""); + throw new ParseBlockException("not equals signature bytes"); } BigInteger number = Numeric.toBigInt(data, 4, 32); BigInteger time = Numeric.toBigInt(data, 36, 32); diff --git a/hildr-batcher/src/main/java/io/optimism/type/L2BlockRef.java b/hildr-batcher/src/main/java/io/optimism/type/L2BlockRef.java index 17752a42..cfd7f475 100644 --- a/hildr-batcher/src/main/java/io/optimism/type/L2BlockRef.java +++ b/hildr-batcher/src/main/java/io/optimism/type/L2BlockRef.java @@ -18,6 +18,7 @@ import java.math.BigInteger; import java.util.Objects; +import org.web3j.protocol.core.methods.response.EthBlock; /** * L2 block brief information. @@ -48,6 +49,23 @@ public BlockId toId() { return new BlockId(hash, number); } + /** + * Create a L2BlockRef instance from EthBlock.Block and L1BlockInfo + * + * @param block block info + * @param l1Info l1 block info + * @return L2BlockRef instance + */ + public static L2BlockRef fromBlockAndL1Info(EthBlock.Block block, L1BlockInfo l1Info) { + return new L2BlockRef( + block.getHash(), + block.getNumber(), + block.getParentHash(), + block.getTimestamp(), + new BlockId(l1Info.blockHash(), l1Info.number()), + l1Info.sequenceNumber()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hildr-utilities/src/main/java/io/optimism/utilities/derive/stages/Frame.java b/hildr-utilities/src/main/java/io/optimism/utilities/derive/stages/Frame.java index 80fd59bf..e23b378d 100644 --- a/hildr-utilities/src/main/java/io/optimism/utilities/derive/stages/Frame.java +++ b/hildr-utilities/src/main/java/io/optimism/utilities/derive/stages/Frame.java @@ -75,6 +75,20 @@ public String code() { return String.valueOf(Objects.hashCode(channelId, frameNumber)); } + /** + * Create a new Frame instance. + * + * @param id channel id + * @param frameNumber frame number + * @param data frame data + * @param isLastFrame is last frame + * @return a new Frame instance + */ + public static Frame create(BigInteger id, int frameNumber, byte[] data, boolean isLastFrame) { + var dataLen = data == null ? 0 : data.length; + return new Frame(id, frameNumber, dataLen, data, isLastFrame, null); + } + /** * Encode this Frame to bytes. * diff --git a/hildr-utilities/src/main/java/io/optimism/utilities/gas/GasCalculator.java b/hildr-utilities/src/main/java/io/optimism/utilities/gas/GasCalculator.java index 8eddb07c..29ad4e6f 100644 --- a/hildr-utilities/src/main/java/io/optimism/utilities/gas/GasCalculator.java +++ b/hildr-utilities/src/main/java/io/optimism/utilities/gas/GasCalculator.java @@ -16,6 +16,8 @@ package io.optimism.utilities.gas; +import java.math.BigInteger; + /** * Gas util. * @@ -28,7 +30,8 @@ public class GasCalculator { /** * Private Constructor of GasCalculator. */ - private GasCalculator() {} + private GasCalculator() { + } public static final long TX_GAS_CONTRACT_CREATION = 53000L; @@ -45,11 +48,11 @@ private GasCalculator() {} /** * Calculator gas fee but exclude effective of AccessList. * - * @param data Tx data + * @param data Tx data * @param isContractCreation Is contract creation - * @param isHomestead Is home stead - * @param isEIP2028 Is EIP2028 - * @param isEIP3860 Is EIP3860 + * @param isHomestead Is home stead + * @param isEIP2028 Is EIP2028 + * @param isEIP3860 Is EIP3860 * @return Intrinsic gas */ public static long intrinsicGasWithoutAccessList( @@ -81,6 +84,19 @@ public static long intrinsicGasWithoutAccessList( return gas; } + /** + * Calculate gas fee cap. + * + * @param baseFee base fee + * @param gasTipCap gas tip cap + * @return gas fee + */ + public static BigInteger calcGasFeeCap(BigInteger baseFee, BigInteger gasTipCap) { + return gasTipCap.add( + baseFee.multiply(BigInteger.TWO) + ); + } + private static long toWordSize(int size) { return (size + 31L) / 32L; }