Skip to content

Commit

Permalink
record batcher metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkAfCod authored and GrapeBaBa committed Oct 16, 2023
1 parent d0c7081 commit 1e4ce31
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.optimism.batcher.publisher.ChannelDataPublisher;
import io.optimism.batcher.publisher.PublisherConfig;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockRef;
import io.optimism.utilities.derive.stages.Frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,8 +51,6 @@ public class BatcherSubmitter extends AbstractExecutionThreadService {

private volatile boolean isShutdownTriggered = false;

private L1BlockRef lastL1Tip;

/**
* Constructor of BatcherSubmitter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.optimism.batcher.channel;

import io.optimism.batcher.config.Config;
import io.optimism.batcher.telemetry.BatcherMetrics;

/**
* ChannelConfig class.
Expand All @@ -27,6 +28,7 @@
* @param maxFrameSize The maximum byte-size a frame can have.
* @param seqWindowSize The maximum byte-size a frame can have.
* @param subSafetyMargin The maximum byte-size a frame can have.
* @param metrics Batcher metrics
* @author thinkAfCod
* @since 0.1.1
*/
Expand All @@ -35,7 +37,8 @@ public record ChannelConfig(
long maxChannelDuration,
int maxFrameSize,
long seqWindowSize,
long subSafetyMargin) {
long subSafetyMargin,
BatcherMetrics metrics) {

/**
* Create a ChannelConfig instance from Config instance.
Expand All @@ -44,6 +47,6 @@ public record ChannelConfig(
* @return ChannelConfig instance
*/
public static ChannelConfig from(Config config) {
return new ChannelConfig(30000, 0, 120_000, 3600, 10);
return new ChannelConfig(30000, 0, 120_000, 3600, 10, config.metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.optimism.batcher.compressor.Compressor;
import io.optimism.batcher.exception.UnsupportedException;
import io.optimism.batcher.telemetry.BatcherMetrics;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.utilities.derive.stages.Batch;
Expand Down Expand Up @@ -58,25 +59,27 @@ public class ChannelImpl implements Channel {

private final ChannelConfig chConfig;

private final BatcherMetrics metrics;

private final BigInteger seqWindowTimeout;

private final BigInteger id;

private AtomicInteger frameNumber;
private final AtomicInteger frameNumber;

private List<Frame> outputFrames;
private final List<Frame> outputFrames;

private Map<String, Frame> pendingTxs;
private final Map<String, Frame> pendingTxs;

private Map<String, BlockId> confirmedTxs;
private final Map<String, BlockId> confirmedTxs;

private List<EthBlock.Block> blocks;
private final List<EthBlock.Block> blocks;

private BigInteger timeoutBlock;

private Compressor compressor;
private final Compressor compressor;

private AtomicInteger rlpLength;
private final AtomicInteger rlpLength;

private volatile boolean isFull;

Expand All @@ -90,6 +93,7 @@ public class ChannelImpl implements Channel {
*/
public ChannelImpl(ChannelConfig chConfig, Compressor compressor) {
this.chConfig = chConfig;
this.metrics = chConfig.metrics();
this.seqWindowTimeout =
BigInteger.valueOf(chConfig.seqWindowSize() - chConfig.subSafetyMargin());
this.compressor = compressor;
Expand Down Expand Up @@ -175,7 +179,7 @@ public boolean hasFrame() {

@Override
public void txFailed(Frame tx) {
// todo metrics record batch tx failed.
this.metrics.recordBatchTxFailed();
var code = tx.code();
if (!this.pendingTxs.containsKey(code)) {
LOGGER.warn(
Expand All @@ -190,7 +194,7 @@ public void txFailed(Frame tx) {

@Override
public List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock) {
// todo metrics RecordBatchTxSubmitted
this.metrics.recordBatchTxSubmitted();
LOGGER.debug(
"marked tx as confirmed: chId: {}; frameNum: {}; block: {}",
tx.channelId(),
Expand All @@ -214,12 +218,12 @@ public List<EthBlock.Block> txConfirmed(Frame tx, BlockId inclusionBlock) {
.subtract(BigInteger.valueOf(chConfig.subSafetyMargin()));
this.updateTimeout(timeout);
if (this.isTimeout()) {
// todo metrics recordChannelTimeout
this.metrics.recordChannelTimedOut(tx);
LOGGER.warn("Channel timeout: chId:{}", tx.channelId());
return this.blocks;
}
if (this.isFullySubmitted()) {
// todo metrics RecordChannelFullySubmitted
this.metrics.recordChannelFullySubmitted(tx);
LOGGER.info("Channel is fully submitted: chId:{}", tx.channelId());
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.optimism.batcher.compressor.CompressorConfig;
import io.optimism.batcher.compressor.Compressors;
import io.optimism.batcher.telemetry.BatcherMetrics;
import io.optimism.type.BlockId;
import io.optimism.type.L1BlockInfo;
import io.optimism.type.L2BlockRef;
Expand Down Expand Up @@ -45,6 +46,8 @@ public class ChannelManager {

private final ChannelConfig chConfig;

private final BatcherMetrics metrics;

private final CompressorConfig compressorConfig;

private List<EthBlock.Block> blocks;
Expand All @@ -67,6 +70,7 @@ public class ChannelManager {
*/
public ChannelManager(final ChannelConfig chConfig, final CompressorConfig compressorConfig) {
this.chConfig = chConfig;
this.metrics = chConfig.metrics();
this.compressorConfig = compressorConfig;
this.blocks = new ArrayList<>(256);
this.channels = new ArrayList<>(256);
Expand All @@ -85,9 +89,9 @@ public void addL2Block(EthBlock.Block block) {
if (!StringUtils.isEmpty(latestBlockHash) && !latestBlockHash.equals(block.getParentHash())) {
throw new ReorgException("block does not extend existing chain");
}
// todo metrics pending block
this.blocks.add(block);
this.latestBlockHash = block.getHash();
this.metrics.recordL2BlockInPendingQueue(block);
}

/**
Expand Down Expand Up @@ -155,7 +159,7 @@ public void txFailed(final Frame tx) {
* @param inclusionBlock inclusion block id
*/
public void txConfirmed(final Frame tx, final BlockId inclusionBlock) {
// todo metrics RecordBatchTxSubmitted
this.metrics.recordBatchTxSubmitted();
LOGGER.debug(
"marked transaction as confirmed: chId: {}; frameNum: {};block: {}",
tx.channelId(),
Expand Down Expand Up @@ -222,21 +226,21 @@ private Channel openChannel(final BlockId l1Head) {
Channel ch = new ChannelImpl(this.chConfig, Compressors.create(this.compressorConfig));
LOGGER.info(
"Created a channel: id:{}, l1Head: {}, blocksPending:{}", ch, l1Head, this.blocks.size());
// todo metrics record opened channel
this.metrics.recordChannelOpened(null, this.blocks.size());
return ch;
}

private void pushBlocks(final Channel lastChannel) {
int blocksAdded = 0;
L2BlockRef unused = null;
L2BlockRef l2Ref = null;
try {
for (final EthBlock.Block block : this.blocks) {
final L1BlockInfo l1Info = lastChannel.addBlock(block);
unused = L2BlockRef.fromBlockAndL1Info(block, l1Info);
// todo metrics recordL2BlockInChannel
l2Ref = L2BlockRef.fromBlockAndL1Info(block, l1Info);
if (latestChannel.isFull()) {
break;
}
this.metrics.recordL2BlockInChannel(block);
blocksAdded += 1;
}
} catch (ChannelException e) {
Expand All @@ -252,7 +256,12 @@ private void pushBlocks(final Channel lastChannel) {
this.blocks = this.blocks.stream().skip(blocksAdded).collect(Collectors.toList());
}

// todo metrics RecordL2BlocksAdded
this.metrics.recordL2BlocksAdded(
l2Ref,
blocksAdded,
this.blocks.size(),
this.latestChannel.inputBytesLength(),
this.latestChannel.readyBytesLength());

LOGGER.debug(
"Added blocks to channel:"
Expand Down
89 changes: 85 additions & 4 deletions hildr-batcher/src/main/java/io/optimism/batcher/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.optimism.batcher.config;

import io.optimism.batcher.telemetry.BatcherMetrics;
import java.util.Objects;

/**
* Batcher config.
*
Expand All @@ -27,9 +30,10 @@
* @param subSafetyMargin Sub-safety margin
* @param pollInterval Milliseconds of poll interval
* @param maxL1TxSize Max L1 Tx Size
* @param targetFrameSize Max L1 Tx Size
* @param targetNumFrames Max L1 Tx Size
* @param approxComprRatio Max L1 Tx Size
* @param targetFrameSize The target of frame size
* @param targetNumFrames The target of frame number
* @param approxComprRatio Compress ratio
* @param metrics Batcher metrics
* @author thinkAfCod
* @since 0.1.1
*/
Expand All @@ -44,4 +48,81 @@ public record Config(
Long maxL1TxSize,
Integer targetFrameSize,
Integer targetNumFrames,
String approxComprRatio) {}
String approxComprRatio,
BatcherMetrics metrics) {

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof Config that)) {
return false;
}
return Objects.equals(this.l1RpcUrl, that.l1RpcUrl)
&& Objects.equals(this.l2RpcUrl, that.l2RpcUrl)
&& Objects.equals(this.rollupRpcUrl, that.rollupRpcUrl)
&& Objects.equals(this.l1Signer, that.l1Signer)
&& Objects.equals(this.batchInboxAddress, that.batchInboxAddress)
&& Objects.equals(this.subSafetyMargin, that.subSafetyMargin)
&& Objects.equals(this.pollInterval, that.pollInterval)
&& Objects.equals(this.maxL1TxSize, that.maxL1TxSize)
&& Objects.equals(this.targetFrameSize, that.targetFrameSize)
&& Objects.equals(this.targetNumFrames, that.targetNumFrames)
&& Objects.equals(this.approxComprRatio, that.approxComprRatio);
}

@Override
public int hashCode() {
return Objects.hash(
l1RpcUrl,
l2RpcUrl,
rollupRpcUrl,
l1Signer,
batchInboxAddress,
subSafetyMargin,
pollInterval,
maxL1TxSize,
targetFrameSize,
targetNumFrames,
approxComprRatio);
}

@Override
public String toString() {
return "Config["
+ "l1RpcUrl="
+ l1RpcUrl
+ ", "
+ "l2RpcUrl="
+ l2RpcUrl
+ ", "
+ "rollupRpcUrl="
+ rollupRpcUrl
+ ", "
+ "l1Signer="
+ l1Signer
+ ", "
+ "batchInboxAddress="
+ batchInboxAddress
+ ", "
+ "subSafetyMargin="
+ subSafetyMargin
+ ", "
+ "pollInterval="
+ pollInterval
+ ", "
+ "maxL1TxSize="
+ maxL1TxSize
+ ", "
+ "targetFrameSize="
+ targetFrameSize
+ ", "
+ "targetNumFrames="
+ targetNumFrames
+ ", "
+ "approxComprRatio="
+ approxComprRatio
+ ']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.optimism.batcher.channel.ReorgException;
import io.optimism.batcher.exception.Web3jCallException;
import io.optimism.batcher.telemetry.BatcherMetrics;
import io.optimism.type.BlockId;
import io.optimism.type.Genesis;
import io.optimism.type.L1BlockInfo;
Expand Down Expand Up @@ -65,6 +66,8 @@ public class BlockLoader implements Closeable {

private final Web3jService rollupService;

private final BatcherMetrics metrics;

private final Consumer<EthBlock.Block> blockConsumer;

private BlockId latestLoadedBlock;
Expand All @@ -75,13 +78,14 @@ public class BlockLoader implements Closeable {
* Constructor of BlockLoader.
*
* @param config LoaderConfig instance
* @param blockConsumer consumer block loaded from L2
* @param blockConsumer Consumer block loaded from L2
*/
public BlockLoader(LoaderConfig config, Consumer<EthBlock.Block> blockConsumer) {
this.l2Client = Web3jProvider.createClient(config.l2RpcUrl());
Tuple2<Web3j, Web3jService> tuple = Web3jProvider.create(config.rollupUrl());
this.rollupClient = tuple.component1();
this.rollupService = tuple.component2();
this.metrics = config.metrics();
this.blockConsumer = blockConsumer;
this.rollupConfig = loadRollConfig();
}
Expand Down Expand Up @@ -140,8 +144,8 @@ private void loadBlocksIntoState() {
if (lastBlock == null) {
throw new BlockLoaderException("get latest block failed");
}
var ignore = l2BlockToBlockRef(lastBlock, rollupConfig.genesis());
// todo metrics.RecordL2BlocksLoaded l2Ref
var l2Ref = l2BlockToBlockRef(lastBlock, rollupConfig.genesis());
this.metrics.recordL2BlocksLoaded(l2Ref);
}

private Tuple2<BlockId, BigInteger> calculateL2BlockRangeToStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
package io.optimism.batcher.loader;

import io.optimism.batcher.config.Config;
import io.optimism.batcher.telemetry.BatcherMetrics;

/**
* L2 loader config.
*
* @param l2RpcUrl L2 rpc url
* @param rollupUrl op-rollup node url
* @param metrics Batcher metrics
* @author thinkAfCod
* @since 0.1.1
*/
public record LoaderConfig(String l2RpcUrl, String rollupUrl) {
public record LoaderConfig(String l2RpcUrl, String rollupUrl, BatcherMetrics metrics) {

/**
* Create a LoaderConfig instance from Config instance.
Expand All @@ -35,6 +37,6 @@ public record LoaderConfig(String l2RpcUrl, String rollupUrl) {
* @return LoaderConfig instance
*/
public static LoaderConfig from(Config config) {
return new LoaderConfig(config.l2RpcUrl(), config.rollupRpcUrl());
return new LoaderConfig(config.l2RpcUrl(), config.rollupRpcUrl(), config.metrics());
}
}
Loading

0 comments on commit 1e4ce31

Please sign in to comment.