Skip to content

Commit

Permalink
Support Large Dictionary for OrcWriter
Browse files Browse the repository at this point in the history
OrcWriter uses dictionary encoding for all columns until the writer's
total dictionary memory exceeds the dictionaryMaxMemory - 4MB. Then
starts abandoning the dictionary encodings.

When running with large dictionary sizes (say 80 MB), and using long
dictionary, the dictionary writer could retain 100's of MB before it
will be abandoned. This change introduces new configuration parameters
to control this behavior.

1. Make the 4 MB threshold when dictionary is almost full configurable.
Large dictionary can configure this to something bigger.
2. When a dictionary column exceeds a certain dictionary size, measure
if dictionary is effective and abandon it if it is not.
3. The setting 2 could affect existing writers, so introduce a 3rd
setting on how often to do the dictionary effectiveness check. It is
configured to INT_MAX to preserve existing behavior.
  • Loading branch information
Arunachalam Thirupathi committed Nov 10, 2021
1 parent fe936d0 commit 699832f
Show file tree
Hide file tree
Showing 6 changed files with 348 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import io.airlift.units.DataSize;
import io.airlift.units.DataSize.Unit;

import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -28,18 +27,48 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

/**
* DictionaryCompressionOptimizer has 2 objectives:
* 1) Bound the dictionary memory of the reader, when all columns are read. Reader's dictionary memory
* should not exceed the dictionaryMemoryMaxBytesHigh.
* 2) When dictionary encoding for a column produces size comparable to the direct encoding, choose
* direct encoding over dictionary encoding. Dictionary encoding/decoding is memory and CPU intensive,
* so for comparable column sizes, direct encoding is mostly better.
* <p>
* Note: Dictionary writer might use more memory as they over-allocate dictionary sizes as the writers
* build dictionary as they see new data. The hash tables implementation in the dictionary writer's allocate
* hash buckets in power of 2. So after a million entries, the overallocation consumes large amount of memory.
* <p>
* DictionaryCompressionOptimizer functionality can be controlled by the following configs to the constructor.
* <p>
* 1. dictionaryMemoryMaxBytes -> Max size of the dictionary when all columns are read. Note: Writer
* might consume more memory due to the over-allocation.
* <p>
* 2. dictionaryMemoryAlmostFullRangeBytes -> When the dictionary size exceeds dictionaryMaxMemoryBytes
* dictionary columns will be converted to direct to reduce the dictionary size. By setting a range
* the stripe can be flushed, before the dictionary is full. When dictionary size is higher than
* (dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes), it is considered almost full
* and is ready for flushing. This setting is defined as a delta on dictionaryMemoryMaxBytes for backward compatibility.
* <p>
* 3. dictionaryUsefulCheckColumnSizeBytes -> Columns start with dictionary encoding and when the dictionary memory
* is almost full, usefulness of the dictionary is measured. For large dictionaries (> 40 MB) the check
* might happen very late and large dictionary might cause writer to OOM due to writer over allocating for
* dictionary growth. When a dictionary for a column grows beyond the dictionaryUsefulCheckColumnSizeBytes the
* dictionary usefulness check will be performed and if dictionary is not useful, it will be converted to direct.
* <p>
* 4. dictionaryUsefulCheckPerChunkFrequency -> dictionaryUsefulCheck could be costly if performed on every chunk.
* The dictionaryUsefulCheck will be performed when a column dictionary is above the dictionaryUsefulCheckColumnSizeBytes
* and per every dictionaryUsefulCheckPerChunkFrequency chunks written.
*/
public class DictionaryCompressionOptimizer
{
private static final double DICTIONARY_MIN_COMPRESSION_RATIO = 1.25;

// Instead of waiting for the dictionary to fill completely, which would force a column into
// direct mode, close the stripe early assuming it has hit the minimum row count.
static final DataSize DICTIONARY_MEMORY_MAX_RANGE = new DataSize(4, Unit.MEGABYTE);

static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, Unit.MEGABYTE);
static final DataSize DIRECT_COLUMN_SIZE_RANGE = new DataSize(4, MEGABYTE);

private final List<DictionaryColumnManager> allWriters;
private final List<DictionaryColumnManager> directConversionCandidates = new ArrayList<>();
Expand All @@ -49,15 +78,21 @@ public class DictionaryCompressionOptimizer
private final int stripeMaxRowCount;
private final int dictionaryMemoryMaxBytesLow;
private final int dictionaryMemoryMaxBytesHigh;
private final int dictionaryUsefulCheckColumnSizeBytes;
private final int dictionaryUsefulCheckPerChunkFrequency;

private int dictionaryMemoryBytes;
private int dictionaryUsefulCheckCounter;

public DictionaryCompressionOptimizer(
Set<? extends DictionaryColumn> writers,
int stripeMinBytes,
int stripeMaxBytes,
int stripeMaxRowCount,
int dictionaryMemoryMaxBytes)
int dictionaryMemoryMaxBytes,
int dictionaryMemoryAlmostFullRangeBytes,
int dictionaryUsefulCheckColumnSizeBytes,
int dictionaryUsefulCheckPerChunkFrequency)
{
requireNonNull(writers, "writers is null");
this.allWriters = writers.stream()
Expand All @@ -74,9 +109,14 @@ public DictionaryCompressionOptimizer(
this.stripeMaxRowCount = stripeMaxRowCount;

checkArgument(dictionaryMemoryMaxBytes >= 0, "dictionaryMemoryMaxBytes is negative");
checkArgument(dictionaryMemoryAlmostFullRangeBytes >= 0, "dictionaryMemoryRangeBytes is negative");
this.dictionaryMemoryMaxBytesHigh = dictionaryMemoryMaxBytes;
this.dictionaryMemoryMaxBytesLow = (int) Math.max(dictionaryMemoryMaxBytes - DICTIONARY_MEMORY_MAX_RANGE.toBytes(), 0);
this.dictionaryMemoryMaxBytesLow = Math.max(dictionaryMemoryMaxBytes - dictionaryMemoryAlmostFullRangeBytes, 0);

checkArgument(dictionaryUsefulCheckPerChunkFrequency >= 0, "dictionaryUsefulCheckPerChunkFrequency is negative");
this.dictionaryUsefulCheckPerChunkFrequency = dictionaryUsefulCheckPerChunkFrequency;

this.dictionaryUsefulCheckColumnSizeBytes = dictionaryUsefulCheckColumnSizeBytes;
directConversionCandidates.addAll(allWriters);
}

Expand All @@ -87,12 +127,12 @@ public int getDictionaryMemoryBytes()

public boolean isFull(long bufferedBytes)
{
// if the strip is big enough to flush, stop before we hit the absolute max, so we are
// if the stripe is big enough to flush, stop before we hit the absolute max, so we are
// not forced to convert a dictionary to direct to fit in memory
if (bufferedBytes > stripeMinBytes) {
return dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow;
}
// strip is small, grow to the high water mark (so at the very least we have more information)
// stripe is small, grow to the high watermark (so at the very least we have more information)
return dictionaryMemoryBytes > dictionaryMemoryMaxBytesHigh;
}

Expand All @@ -107,30 +147,43 @@ public void reset()
public void finalOptimize(int bufferedBytes)
{
updateDirectConversionCandidates();
convertLowCompressionStreams(bufferedBytes);
convertLowCompressionStreams(true, bufferedBytes);
}

public void optimize(int bufferedBytes, int stripeRowCount)
@VisibleForTesting
boolean isUsefulCheckRequired(int dictionaryMemoryBytes)
{
// recompute the dictionary memory usage
dictionaryMemoryBytes = allWriters.stream()
.filter(writer -> !writer.isDirectEncoded())
.mapToInt(DictionaryColumnManager::getDictionaryBytes)
.sum();
if (dictionaryMemoryBytes < dictionaryUsefulCheckColumnSizeBytes) {
return false;
}

// update the dictionary growth history
allWriters.stream()
.filter(writer -> !writer.isDirectEncoded())
.forEach(column -> column.updateHistory(stripeRowCount));
dictionaryUsefulCheckCounter++;
if (dictionaryUsefulCheckCounter == dictionaryUsefulCheckPerChunkFrequency) {
dictionaryUsefulCheckCounter = 0;
return true;
}

if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow) {
return;
return false;
}

public void optimize(int bufferedBytes, int stripeRowCount)
{
// recompute the dictionary memory usage
int totalDictionaryBytes = 0;
for (DictionaryColumnManager writer : allWriters) {
if (!writer.isDirectEncoded()) {
totalDictionaryBytes += writer.getDictionaryBytes();
writer.updateHistory(stripeRowCount);
}
}
dictionaryMemoryBytes = totalDictionaryBytes;

updateDirectConversionCandidates();
boolean isDictionaryAlmostFull = dictionaryMemoryBytes > dictionaryMemoryMaxBytesLow;

// before any further checks, convert all low compression streams
bufferedBytes = convertLowCompressionStreams(bufferedBytes);
if (isDictionaryAlmostFull || isUsefulCheckRequired(dictionaryMemoryBytes)) {
updateDirectConversionCandidates();
bufferedBytes = convertLowCompressionStreams(isDictionaryAlmostFull, bufferedBytes);
}

if (dictionaryMemoryBytes <= dictionaryMemoryMaxBytesLow || bufferedBytes >= stripeMaxBytes) {
return;
Expand Down Expand Up @@ -161,7 +214,7 @@ private void optimizeDictionaryColumns(int stripeRowCount, BufferedBytesCounter
return;
}

// if the stripe is larger then the minimum stripe size, we are not required to convert any more dictionary columns to direct
// if the stripe is larger than the minimum stripe size, we are not required to convert any more dictionary columns to direct
if (bufferedBytesCounter.getBufferedBytes() >= stripeMinBytes) {
// check if we can get better compression by converting a dictionary column to direct. This can happen when then there are multiple
// dictionary columns and one does not compress well, so if we convert it to direct we can continue to use the existing dictionaries
Expand Down Expand Up @@ -196,27 +249,35 @@ private boolean convertDictionaryColumn(BufferedBytesCounter bufferedBytesCounte
}

@VisibleForTesting
int convertLowCompressionStreams(int bufferedBytes)
int convertLowCompressionStreams(boolean tryAllStreams, int bufferedBytes)
{
// convert all low compression column to direct
Iterator<DictionaryColumnManager> iterator = directConversionCandidates.iterator();
while (iterator.hasNext()) {
DictionaryColumnManager dictionaryWriter = iterator.next();
if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) {
int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes());
OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes));
iterator.remove();
if (directBytes.isPresent()) {
bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes;
if (bufferedBytes >= stripeMaxBytes) {
return bufferedBytes;
if (tryAllStreams || dictionaryWriter.getDictionaryBytes() >= dictionaryUsefulCheckColumnSizeBytes) {
if (dictionaryWriter.getCompressionRatio() < DICTIONARY_MIN_COMPRESSION_RATIO) {
int columnBufferedBytes = toIntExact(dictionaryWriter.getBufferedBytes());
OptionalInt directBytes = tryConvertToDirect(dictionaryWriter, getMaxDirectBytes(bufferedBytes));
iterator.remove();
if (directBytes.isPresent()) {
bufferedBytes = bufferedBytes + directBytes.getAsInt() - columnBufferedBytes;
if (bufferedBytes >= stripeMaxBytes) {
return bufferedBytes;
}
}
}
}
}
return bufferedBytes;
}

@VisibleForTesting
List<DictionaryColumnManager> getDirectConversionCandidates()
{
return directConversionCandidates;
}

private void updateDirectConversionCandidates()
{
// Writers can switch to Direct encoding internally. Remove them from direct conversion candidates.
Expand Down Expand Up @@ -255,14 +316,14 @@ private double currentCompressionRatio(int totalNonDictionaryBytes)
}

/**
* Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ration
* Choose a dictionary column to convert to direct encoding. We do this by predicting the compression ratio
* of the stripe if a singe column is flipped to direct. So for each column, we try to predict the row count
* when we will hit a stripe flush limit if that column were converted to direct. Once we know the row count, we
* calculate the predicted compression ratio.
*
* @param totalNonDictionaryBytes current size of the stripe without non-dictionary columns
* @param stripeRowCount current number of rows in the stripe
* @return the column that would produce the best stripe compression ration if converted to direct
* @return the column that would produce the best stripe compression ratio if converted to direct
*/
private DictionaryCompressionProjection selectDictionaryColumnToConvert(int totalNonDictionaryBytes, int stripeRowCount)
{
Expand Down Expand Up @@ -305,7 +366,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
long currentIndexBytes = totalDictionaryIndexBytes - column.getIndexBytes();
long currentTotalBytes = currentRawBytes + currentDictionaryBytes + currentIndexBytes;

// estimate the size of each new row if we were convert this column to direct
// estimate the size of each new row if we were to convert this column to direct
double rawBytesPerFutureRow = totalNonDictionaryBytesPerRow + column.getRawBytesPerRow();
double dictionaryBytesPerFutureRow = totalDictionaryBytesPerNewRow - column.getDictionaryBytesPerFutureRow();
double indexBytesPerFutureRow = totalDictionaryIndexBytesPerRow - column.getIndexBytesPerRow();
Expand All @@ -317,7 +378,7 @@ private DictionaryCompressionProjection selectDictionaryColumnToConvert(int tota
long rowsToStripeRowLimit = stripeMaxRowCount - stripeRowCount;
long rowsToLimit = Longs.min(rowsToDictionaryMemoryLimit, rowsToStripeMemoryLimit, rowsToStripeRowLimit);

// predict the compression ratio at that limit if we were convert this column to direct
// predict the compression ratio at that limit if we were to convert this column to direct
long predictedUncompressedSizeAtLimit = totalNonDictionaryBytes + totalDictionaryRawBytes + (totalUncompressedBytesPerRow * rowsToLimit);
long predictedCompressedSizeAtLimit = (long) (currentTotalBytes + (totalBytesPerFutureRow * rowsToLimit));
double predictedCompressionRatioAtLimit = 1.0 * predictedUncompressedSizeAtLimit / predictedCompressedSizeAtLimit;
Expand Down Expand Up @@ -371,7 +432,8 @@ public interface DictionaryColumn
boolean isDirectEncoded();
}

private static class DictionaryColumnManager
@VisibleForTesting
static class DictionaryColumnManager
{
private final DictionaryColumn dictionaryColumn;

Expand Down Expand Up @@ -481,6 +543,12 @@ public boolean isDirectEncoded()
{
return dictionaryColumn.isDirectEncoded();
}

@VisibleForTesting
public DictionaryColumn getDictionaryColumn()
{
return dictionaryColumn;
}
}

private static class DictionaryCompressionProjection
Expand Down
10 changes: 7 additions & 3 deletions presto-orc/src/main/java/com/facebook/presto/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,18 @@ public OrcWriter(
}
}
this.columnWriters = columnWriters.build();
this.dictionaryMaxMemoryBytes = toIntExact(
requireNonNull(options.getDictionaryMaxMemory(), "dictionaryMaxMemory is null").toBytes());
this.dictionaryMaxMemoryBytes = toIntExact(options.getDictionaryMaxMemory().toBytes());
int dictionaryMemoryAlmostFullRangeBytes = toIntExact(options.getDictionaryMemoryAlmostFullRange().toBytes());
int dictionaryUsefulCheckColumnSizeBytes = toIntExact(options.getDictionaryUsefulCheckColumnSize().toBytes());
this.dictionaryCompressionOptimizer = new DictionaryCompressionOptimizer(
dictionaryColumnWriters.build(),
stripeMinBytes,
stripeMaxBytes,
stripeMaxRowCount,
dictionaryMaxMemoryBytes);
dictionaryMaxMemoryBytes,
dictionaryMemoryAlmostFullRangeBytes,
dictionaryUsefulCheckColumnSizeBytes,
options.getDictionaryUsefulCheckPerChunkFrequency());

for (Entry<String, String> entry : this.userMetadata.entrySet()) {
recordValidation(validation -> validation.addMetadataProperty(entry.getKey(), utf8Slice(entry.getValue())));
Expand Down
Loading

0 comments on commit 699832f

Please sign in to comment.