Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logic of channel publish #43

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package io.optimism.batcher;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import io.optimism.batcher.channel.ChannelConfig;
import io.optimism.batcher.channel.ChannelManager;
import io.optimism.batcher.compressor.CompressorConfig;
import io.optimism.batcher.config.Config;
import io.optimism.batcher.exception.BatcherExecutionException;
import io.optimism.batcher.loader.BlockLoader;
import io.optimism.batcher.loader.LoaderConfig;
import io.optimism.batcher.publisher.ChannelDataPublisher;
import io.optimism.batcher.publisher.PublisherConfig;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockRef;
import io.optimism.utilities.derive.stages.Frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
Expand All @@ -47,19 +52,22 @@ public class BatcherSubmitter extends AbstractExecutionThreadService {

private volatile boolean isShutdownTriggered = false;

private L1BlockRef lastL1Tip;

/**
* Constructor of BatcherSubmitter.
*
* @param config BatcherSubmitter config
*/
public BatcherSubmitter(Config config) {
this.config = config;
this.channelManager = new ChannelManager();
this.channelManager =
new ChannelManager(ChannelConfig.from(config), CompressorConfig.from(config));
this.blockLoader = new BlockLoader(LoaderConfig.from(config), this.channelManager::addL2Block);

this.channelPublisher =
new ChannelDataPublisher(
PublisherConfig.from(config, this.blockLoader.getRollConfig().batchInboxAddress()),
PublisherConfig.from(config, this.blockLoader.getRollConfig()),
this.channelManager::txData,
this::handleReceipt);
}
Expand All @@ -72,16 +80,17 @@ private void trySubmitBatchData() {
Thread.sleep(config.pollInterval());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BatcherExecutionException(e);
throw new BatcherExecutionException("Batcher thread has been interrupted", e);
}
}
}

private void handleReceipt(TransactionReceipt receipt) {
private void handleReceipt(Frame tx, TransactionReceipt receipt) {
if (receipt.isStatusOK()) {
// todo this.channelManager.txConfirmed();
this.channelManager.txConfirmed(
tx, new BlockId(receipt.getBlockHash(), receipt.getBlockNumber()));
} else {
// todo this.channelManager.txFailed();
this.channelManager.txFailed(tx);
}
}

Expand Down
134 changes: 116 additions & 18 deletions hildr-batcher/src/main/java/io/optimism/batcher/channel/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,132 @@

package io.optimism.batcher.channel;

import org.apache.commons.lang3.ArrayUtils;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.utilities.derive.stages.Frame;
import java.io.Closeable;
import java.math.BigInteger;
import java.util.List;
import org.web3j.protocol.core.methods.response.EthBlock;

/**
* Channel interface. cache batch submit data.
*
* @author thinkAfCod
* @since 0.1.1
*/
public interface Channel {
public interface Channel extends Closeable {

/** Derivation version. */
byte DERIVATION_VERSION_0 = 0;
/**
* Add block to channel, block will be parsed and be compressed.
*
* @param block Block on L2
* @return l1 block info
*/
L1BlockInfo addBlock(EthBlock.Block block);

/** Split channel data to frames. */
void splitToFrame();

/**
* Get next frame data should be published to l1. The data added to the channel will be
* transformed into multiple frames.
*
* @return TxData instance contains frame data
*/
Frame nextFrame();

/**
* Push frame to current channel.
*
* @param frame TxData instance
*/
void pushFrame(Frame frame);

/**
* Get Total frames.
*
* @return total value.
*/
int totalFrames();

/**
* Get count of pending frames.
*
* @return count of pending frames
*/
int pendingFrames();

/**
* If has frame.
*
* @return true if has data of frame, otherwise false.
*/
boolean hasFrame();

/**
* Has none pending tx data.
*
* @return true if has none pending tx data, otherwise false.
*/
boolean noneSubmitted();

/**
* Check is tx data fully submitted.
*
* @return ture if fully submitted, otherwise false.
*/
boolean isFullySubmitted();

/**
* Process failed tx that belong to the channel. Will push tx back to pending queue.
*
* @param tx failed tx data
*/
void txFailed(Frame tx);

/**
* Process confirmed tx that belong to the channel.
*
* @param tx confirmed tx data
* @param inclusionBlock tx data be inclusion block number
* @return if channel was timeout, the blocks added to the channel will be returned.
*/
List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock);

/**
* If channel touch limit of frame data.
*
* @return true if full of data, otherwise false.
*/
boolean isFull();

/**
* If the channel data expired at the specified block height.
*
* @param blockNumber block height number
* @return true if timeout,otherwise false.
*/
boolean isTimeout(BigInteger blockNumber);

/**
* Update channel data expired at the specified block height.
*
* @param blockNumber block height number
*/
void updateTimeout(final BigInteger blockNumber);

/**
* Input bytes data.
*
* @return input bytes
*/
int inputBytesLength();

/**
* Channel Tx Data class.
* Ready to publishing bytes.
*
* @param data L2 block data that will send to L1
* @param channelId channelId
* @param frameNumber channel frame number
* @return ready to publishing bytes
*/
record TxData(byte[] data, byte[] channelId, int frameNumber) {
/**
* Get tx bytes.
*
* @return tx bytes
*/
public byte[] txBytes() {
return ArrayUtils.addAll(new byte[] {DERIVATION_VERSION_0}, data());
}
}
int readyBytesLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
*
* @param channelTimeout The maximum number of L1 blocks that the inclusion transactions of a
* channel's frames can span.
* @param maxChannelDuration If 0, duration checks are disabled.
* @param maxChannelDuration Timeout of max block number.If 0, duration checks are disabled.
* @param maxFrameSize The maximum byte-size a frame can have.
* @param seqWindowSize The maximum byte-size a frame can have.
* @param subSafetyMargin The maximum byte-size a frame can have.
* @author thinkAfCod
* @since 0.1.1
*/
public record ChannelConfig(long channelTimeout, long maxChannelDuration, int maxFrameSize) {
public record ChannelConfig(
long channelTimeout,
long maxChannelDuration,
int maxFrameSize,
long seqWindowSize,
long subSafetyMargin) {

/**
* Create a ChannelConfig instance from Config instance.
Expand All @@ -37,6 +44,6 @@ public record ChannelConfig(long channelTimeout, long maxChannelDuration, int ma
* @return ChannelConfig instance
*/
public static ChannelConfig from(Config config) {
return new ChannelConfig(30000, 0, 120_000);
return new ChannelConfig(30000, 0, 120_000, 3600, 10);
}
}
Loading
Loading