Skip to content

Commit

Permalink
feat: add compression mode support for BlockAsFileReader (#449)
Browse files Browse the repository at this point in the history
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
  • Loading branch information
ata-nas authored Jan 9, 2025
1 parent c303fa0 commit 7a0420a
Show file tree
Hide file tree
Showing 26 changed files with 717 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import javax.inject.Singleton;
Expand All @@ -55,16 +54,14 @@ public interface PersistenceInjectionModule {
@Provides
@Singleton
static BlockWriter<List<BlockItemUnparsed>> providesBlockWriter(
@NonNull final PersistenceStorageConfig config,
@NonNull final BlockNodeContext blockNodeContext,
@NonNull final BlockRemover blockRemover,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression) {
Objects.requireNonNull(blockRemover);
Objects.requireNonNull(blockPathResolver);
final StorageType persistenceType = blockNodeContext
.configuration()
.getConfigData(PersistenceStorageConfig.class)
.type();
final StorageType persistenceType = config.type();
try {
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileWriter.of(blockNodeContext, blockPathResolver, compression);
Expand All @@ -90,10 +87,12 @@ static BlockWriter<List<BlockItemUnparsed>> providesBlockWriter(
@Provides
@Singleton
static BlockReader<BlockUnparsed> providesBlockReader(
@NonNull final PersistenceStorageConfig config, @NonNull final BlockPathResolver blockPathResolver) {
@NonNull final PersistenceStorageConfig config,
@NonNull final BlockPathResolver blockPathResolver,
@NonNull final Compression compression) {
final StorageType persistenceType = config.type();
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileReader.of(blockPathResolver);
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFileReader.of(compression, blockPathResolver);
case BLOCK_AS_LOCAL_DIRECTORY -> BlockAsLocalDirReader.of(config);
case NO_OP -> NoOpBlockReader.newInstance();
};
Expand Down Expand Up @@ -131,10 +130,9 @@ static BlockRemover providesBlockRemover(
@Singleton
static BlockPathResolver providesPathResolver(@NonNull final PersistenceStorageConfig config) {
final StorageType persistenceType = config.type();
final Path blockStorageRoot = Path.of(config.liveRootPath());
return switch (persistenceType) {
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFilePathResolver.of(blockStorageRoot);
case BLOCK_AS_LOCAL_DIRECTORY -> BlockAsLocalDirPathResolver.of(blockStorageRoot);
case BLOCK_AS_LOCAL_FILE -> BlockAsLocalFilePathResolver.of(config);
case BLOCK_AS_LOCAL_DIRECTORY -> BlockAsLocalDirPathResolver.of(config);
case NO_OP -> NoOpBlockPathResolver.newInstance();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,28 @@ public enum CompressionType {
* This type of compression is used to compress the blocks using the
* `Zstandard` algorithm.
*/
ZSTD(0, 20),
ZSTD(0, 20, ".zstd"),
/**
* This type means no compression will be done.
*/
NONE(Integer.MIN_VALUE, Integer.MAX_VALUE);
NONE(Integer.MIN_VALUE, Integer.MAX_VALUE, "");

private final int minCompressionLevel;
private final int maxCompressionLevel;
private final String fileExtension;

CompressionType(final int minCompressionLevel, final int maxCompressionLevel) {
CompressionType(final int minCompressionLevel, final int maxCompressionLevel, final String fileExtension) {
this.minCompressionLevel = minCompressionLevel;
this.maxCompressionLevel = maxCompressionLevel;
this.fileExtension = fileExtension;
}

public void verifyCompressionLevel(final int levelToCheck) {
Preconditions.requireInRange(levelToCheck, minCompressionLevel, maxCompressionLevel);
}

public String getFileExtension() {
return fileExtension;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
Expand All @@ -24,6 +25,20 @@ public interface Compression {
@NonNull
OutputStream wrap(@NonNull final OutputStream streamToWrap) throws IOException;

/**
* This method takes a valid, {@code non-null} {@link InputStream} instance
* and wraps it with the specific compression algorithm implementation. The
* resulting {@link InputStream} is then returned.
*
* @param streamToWrap a valid {@code non-null} {@link InputStream} to wrap
* @return a {@code non-null} {@link InputStream} that wraps the provided
* {@link InputStream} with the specific compression algorithm
* implementation
* @throws IOException if an I/O exception occurs
*/
@NonNull
InputStream wrap(@NonNull final InputStream streamToWrap) throws IOException;

/**
* This method aims to return a valid, {@code non-blank} {@link String} that
* represents the file extension for the given specific implementation,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.compression;

import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;

Expand Down Expand Up @@ -39,9 +42,21 @@ public OutputStream wrap(@NonNull final OutputStream streamToWrap) {
return Objects.requireNonNull(streamToWrap);
}

/**
* This implementation does not decompress the data. It uses no compression
* algorithm, but simply generates a stream that reads the data from it`s
* destination, as it has been written.
* @see Compression#wrap(OutputStream) for API contract
*/
@NonNull
@Override
public InputStream wrap(@NonNull final InputStream streamToWrap) throws IOException {
return Objects.requireNonNull(streamToWrap);
}

@NonNull
@Override
public String getCompressionFileExtension() {
return "";
return CompressionType.NONE.getFileExtension();
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.compression;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;

Expand Down Expand Up @@ -39,21 +42,21 @@ public static ZstdCompression of(@NonNull final PersistenceStorageConfig config)
return new ZstdCompression(config);
}

/**
* This implementation uses the compression
* algorithm, but simply generates a stream that writes the data to it`s
* destination, as it is received.
* @see Compression#wrap(OutputStream) for API contract
*/
@NonNull
@Override
public OutputStream wrap(@NonNull final OutputStream streamToWrap) throws IOException {
return new ZstdOutputStream(Objects.requireNonNull(streamToWrap), compressionLevel);
}

@NonNull
@Override
public InputStream wrap(@NonNull final InputStream streamToWrap) throws IOException {
return new ZstdInputStream(Objects.requireNonNull(streamToWrap));
}

@NonNull
@Override
public String getCompressionFileExtension() {
return ".zstd";
return CompressionType.ZSTD.getFileExtension();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
package com.hedera.block.server.persistence.storage.path;

import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;

/**
* A Block path resolver for block-as-dir.
Expand All @@ -15,30 +17,58 @@ public final class BlockAsLocalDirPathResolver implements BlockPathResolver {
/**
* Constructor.
*
* @param liveRootPath valid, {@code non-null} instance of {@link Path}
* that points to the live root of the block storage
* @param config valid, {@code non-null} instance of
* {@link PersistenceStorageConfig} used for initializing the resolver
*/
private BlockAsLocalDirPathResolver(@NonNull final Path liveRootPath) {
this.liveRootPath = Objects.requireNonNull(liveRootPath);
private BlockAsLocalDirPathResolver(@NonNull final PersistenceStorageConfig config) {
this.liveRootPath = Path.of(config.liveRootPath());
}

/**
* This method creates and returns a new instance of
* {@link BlockAsLocalDirPathResolver}.
*
* @param liveRootPath valid, {@code non-null} instance of {@link Path}
* that points to the live root of the block storage
* @param config valid, {@code non-null} instance of
* {@link PersistenceStorageConfig} used for initializing the resolver
* @return a new, fully initialized instance of
* {@link BlockAsLocalDirPathResolver}
*/
public static BlockAsLocalDirPathResolver of(@NonNull final Path liveRootPath) {
return new BlockAsLocalDirPathResolver(liveRootPath);
public static BlockAsLocalDirPathResolver of(@NonNull final PersistenceStorageConfig config) {
return new BlockAsLocalDirPathResolver(config);
}

@NonNull
@Override
public Path resolvePathToBlock(final long blockNumber) {
public Path resolveLiveRawPathToBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
return liveRootPath.resolve(String.valueOf(blockNumber));
}

@NonNull
@Override
public Path resolveArchiveRawPathToBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
throw new UnsupportedOperationException("Not implemented yet");
}

/**
* The block-as-local-dir implementation does not support
* compression/decompression. A block will only be found if it exists and is
* not compressed.
*
* @param blockNumber the block number to find the block file for
* @return an {@link Optional} containing the path to the block file if it
* exists, otherwise an empty {@link Optional}
*/
@NonNull
@Override
public Optional<Path> findBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
final Path liveRawRootPath = resolveLiveRawPathToBlock(blockNumber);
if (Files.exists(liveRawRootPath)) {
return Optional.of(liveRawRootPath);
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,50 +1,79 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.persistence.storage.path;

import com.hedera.block.common.utils.FileUtilities;
import com.hedera.block.common.utils.Preconditions;
import com.hedera.block.server.Constants;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig.CompressionType;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;

/**
* A Block path resolver for block-as-file.
*/
public final class BlockAsLocalFilePathResolver implements BlockPathResolver {
private static final int MAX_LONG_DIGITS = 19;
private final Path liveRootPath;
private final CompressionType compressionType;

/**
* Constructor.
*
* @param liveRootPath valid, {@code non-null} instance of {@link Path}
* that points to the live root of the block storage
* @param config valid, {@code non-null} instance of
* {@link PersistenceStorageConfig} used for initializing the resolver
*/
private BlockAsLocalFilePathResolver(@NonNull final Path liveRootPath) {
this.liveRootPath = Objects.requireNonNull(liveRootPath);
private BlockAsLocalFilePathResolver(@NonNull final PersistenceStorageConfig config) {
this.liveRootPath = Path.of(config.liveRootPath());
this.compressionType = config.compression();
}

/**
* This method creates and returns a new instance of
* {@link BlockAsLocalFilePathResolver}.
*
* @param liveRootPath valid, {@code non-null} instance of {@link Path}
* that points to the live root of the block storage
* @return a new, fully initialized instance of
* {@link BlockAsLocalFilePathResolver}
* @param config valid, {@code non-null} instance of
* {@link PersistenceStorageConfig} used for initializing the resolver
* @return a new, fully initialized instance of {@link BlockAsLocalFilePathResolver}
*/
public static BlockAsLocalFilePathResolver of(@NonNull final Path liveRootPath) {
return new BlockAsLocalFilePathResolver(liveRootPath);
public static BlockAsLocalFilePathResolver of(@NonNull final PersistenceStorageConfig config) {
return new BlockAsLocalFilePathResolver(config);
}

@NonNull
@Override
public Path resolvePathToBlock(final long blockNumber) {
public Path resolveLiveRawPathToBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
final String rawBlockNumber = String.format("%0" + MAX_LONG_DIGITS + "d", blockNumber);
final String[] blockPath = rawBlockNumber.split("");
final String blockFileName = rawBlockNumber.concat(Constants.BLOCK_FILE_EXTENSION);
blockPath[blockPath.length - 1] = blockFileName;
return Path.of(liveRootPath.toString(), blockPath);
}

@NonNull
@Override
public Path resolveArchiveRawPathToBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
throw new UnsupportedOperationException("Not implemented yet");
}

@NonNull
@Override
public Optional<Path> findBlock(final long blockNumber) {
Preconditions.requireWhole(blockNumber);
final Path liveRawRootPath = resolveLiveRawPathToBlock(blockNumber);
final Path compressionExtendedLiveRawRootPath =
FileUtilities.appendExtension(liveRawRootPath, compressionType.getFileExtension());
if (Files.exists(compressionExtendedLiveRawRootPath)) {
return Optional.of(compressionExtendedLiveRawRootPath);
} else if (Files.exists(liveRawRootPath)) {
return Optional.of(liveRawRootPath);
} else {
return Optional.empty();
}
} // todo consider to add additional handling here, like test for other compression types (currently not existing)
// and also look for archived blocks (will be implemented in a follow-up PR)
}
Loading

0 comments on commit 7a0420a

Please sign in to comment.