Skip to content

Commit

Permalink
logic of channel publish
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod committed Aug 16, 2023
1 parent 1b53a64 commit 57dd006
Show file tree
Hide file tree
Showing 13 changed files with 963 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package io.optimism.batcher;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.optimism.batcher.channel.ChannelConfig;
import io.optimism.batcher.channel.ChannelManager;
import io.optimism.batcher.compressor.CompressorConfig;
import io.optimism.batcher.config.Config;
import io.optimism.batcher.exception.BatcherExecutionException;
import io.optimism.batcher.loader.BlockLoader;
import io.optimism.batcher.loader.LoaderConfig;
import io.optimism.batcher.publisher.ChannelDataPublisher;
import io.optimism.batcher.publisher.PublisherConfig;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockRef;
import io.optimism.utilities.derive.stages.Frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
Expand All @@ -47,19 +52,22 @@ public class BatcherSubmitter extends AbstractExecutionThreadService {

private volatile boolean isShutdownTriggered = false;

private L1BlockRef lastL1Tip;

/**
* Constructor of BatcherSubmitter.
*
* @param config BatcherSubmitter config
*/
public BatcherSubmitter(Config config) {
this.config = config;
this.channelManager = new ChannelManager();
this.channelManager =
new ChannelManager(ChannelConfig.from(config), CompressorConfig.from(config));
this.blockLoader = new BlockLoader(LoaderConfig.from(config), this.channelManager::addL2Block);

this.channelPublisher =
new ChannelDataPublisher(
PublisherConfig.from(config, this.blockLoader.getRollConfig().batchInboxAddress()),
PublisherConfig.from(config, this.blockLoader.getRollConfig()),
this.channelManager::txData,
this::handleReceipt);
}
Expand All @@ -72,16 +80,17 @@ private void trySubmitBatchData() {
Thread.sleep(config.pollInterval());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BatcherExecutionException(e);
throw new BatcherExecutionException("Batcher thread has been interrupted", e);
}
}
}

private void handleReceipt(TransactionReceipt receipt) {
private void handleReceipt(Frame tx, TransactionReceipt receipt) {
if (receipt.isStatusOK()) {
// todo this.channelManager.txConfirmed();
this.channelManager.txConfirmed(
tx, new BlockId(receipt.getBlockHash(), receipt.getBlockNumber()));
} else {
// todo this.channelManager.txFailed();
this.channelManager.txFailed(tx);
}
}

Expand Down
134 changes: 116 additions & 18 deletions hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,132 @@

package io.optimism.batcher.channel;

import org.apache.commons.lang3.ArrayUtils;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.utilities.derive.stages.Frame;
import java.io.Closeable;
import java.math.BigInteger;
import java.util.List;
import org.web3j.protocol.core.methods.response.EthBlock;

/**
* Channel interface. cache batch submit data.
*
* @author thinkAfCod
* @since 0.1.1
*/
public interface Channel {
public interface Channel extends Closeable {

/** Derivation version. */
byte DERIVATION_VERSION_0 = 0;
/**
* Add block to channel, block will be parsed and be compressed.
*
* @param block Block on L2
* @return l1 block info
*/
L1BlockInfo addBlock(EthBlock.Block block);

/** Split channel data to frames. */
void splitToFrame();

/**
* Get next frame data should be published to l1. The data added to the channel will be
* transformed into multiple frames.
*
* @return TxData instance contains frame data
*/
Frame nextFrame();

/**
* Push frame to current channel.
*
* @param frame TxData instance
*/
void pushFrame(Frame frame);

/**
* Get Total frames.
*
* @return total value.
*/
int totalFrames();

/**
* Get count of pending frames.
*
* @return count of pending frames
*/
int pendingFrames();

/**
* If has frame.
*
* @return true if has data of frame, otherwise false.
*/
boolean hasFrame();

/**
* Has none pending tx data.
*
* @return true if has none pending tx data, otherwise false.
*/
boolean noneSubmitted();

/**
* Check is tx data fully submitted.
*
* @return ture if fully submitted, otherwise false.
*/
boolean isFullySubmitted();

/**
* Process failed tx that belong to the channel. Will push tx back to pending queue.
*
* @param tx failed tx data
*/
void txFailed(Frame tx);

/**
* Process confirmed tx that belong to the channel.
*
* @param tx confirmed tx data
* @param inclusionBlock tx data be inclusion block number
* @return if channel was timeout, the blocks added to the channel will be returned.
*/
List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock);

/**
* If channel touch limit of frame data.
*
* @return true if full of data, otherwise false.
*/
boolean isFull();

/**
* If the channel data expired at the specified block height.
*
* @param blockNumber block height number
* @return true if timeout,otherwise false.
*/
boolean isTimeout(BigInteger blockNumber);

/**
* Update channel data expired at the specified block height.
*
* @param blockNumber block height number
*/
void updateTimeout(final BigInteger blockNumber);

/**
* Input bytes data.
*
* @return input bytes
*/
int inputBytesLength();

/**
* Channel Tx Data class.
* Ready to publishing bytes.
*
* @param data L2 block data that will send to L1
* @param channelId channelId
* @param frameNumber channel frame number
* @return ready to publishing bytes
*/
record TxData(byte[] data, byte[] channelId, int frameNumber) {
/**
* Get tx bytes.
*
* @return tx bytes
*/
public byte[] txBytes() {
return ArrayUtils.addAll(new byte[] {DERIVATION_VERSION_0}, data());
}
}
int readyBytesLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
*
* @param channelTimeout The maximum number of L1 blocks that the inclusion transactions of a
* channel's frames can span.
* @param maxChannelDuration If 0, duration checks are disabled.
* @param maxChannelDuration Timeout of max block number.If 0, duration checks are disabled.
* @param maxFrameSize The maximum byte-size a frame can have.
* @param seqWindowSize The maximum byte-size a frame can have.
* @param subSafetyMargin The maximum byte-size a frame can have.
* @author thinkAfCod
* @since 0.1.1
*/
public record ChannelConfig(long channelTimeout, long maxChannelDuration, int maxFrameSize) {
public record ChannelConfig(
long channelTimeout,
long maxChannelDuration,
int maxFrameSize,
long seqWindowSize,
long subSafetyMargin) {

/**
* Create a ChannelConfig instance from Config instance.
Expand All @@ -37,6 +44,6 @@ public record ChannelConfig(long channelTimeout, long maxChannelDuration, int ma
* @return ChannelConfig instance
*/
public static ChannelConfig from(Config config) {
return new ChannelConfig(30000, 0, 120_000);
return new ChannelConfig(30000, 0, 120_000, 3600, 10);
}
}
Loading

0 comments on commit 57dd006

Please sign in to comment.