Skip to content

Commit

Permalink
Merge branch 'release'
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusKlik committed Jun 6, 2018
2 parents 6eeb3e3 + 978410b commit 5b83d5e
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 36 deletions.
1 change: 1 addition & 0 deletions .Rbuildignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
LZ4/LICENSE$
\.md$
^docs$
\.TMP$
\.png$
\.yml$
dataset\.fst$
Expand Down
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ Description: Multithreaded serialization of compressed data frames using the
'fst' format. The 'fst' format allows for random access of stored data and
compression with the LZ4 and ZSTD compressors created by Yann Collet. The ZSTD
compression library is owned by Facebook Inc.
Version: 0.8.6
Date: 2018-05-15
Version: 0.8.8
Date: 2018-06-06
Authors@R: c(
person("Mark", "Klik", email = "markklik@gmail.com", role = c("aut", "cre", "cph")),
person("Yann", "Collet", role = c("ctb", "cph"),
Expand Down
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@

# fst 0.8.6
# fst 0.8.8 (June 6, 2018)

Version 0.8.8 of the `fst` package is an intermediate release designed to fix valgrind warnings reported on CRAN builds (per request of CRAN maintainers). These warnings were due to `fst` writing uninitialized data buffers to file, which was done to maximize speed. To fix these warnings (and for safety), all memory blocks are now initialized to zero before being written to disk.

# fst 0.8.6 (May 15, 2018)

Version 0.8.6 of the `fst` package brings clearer printing of `fst_table` objects. It also includes optimizations for controlling the number of threads used by the package during reads and writes and after a fork has ended. The `LZ4` and `ZSTD` compression libraries are updated to their latest (and fastest) releases. UTF-8 encoded column names are now correctly stored in the `fst` format.

Expand Down
11 changes: 3 additions & 8 deletions cran-comments.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

## Submission

In this submission of fst, build errors reported in the CRAN check results for version 0.8.4 are addressed (thanks Kurt Hornik for the warning). These errors were due to failing unit tests and have been traced back to changes in the data.table code base (for the ITime type) between version 1.10.4-3 and 1.11.0. All issues have been resolved in this release.
This submission of fst adresses valgrind warnings that are reported on the v0.8.6 package build on CRAN. These warnings are caused by writing uninitialized (meta-data) buffers to file (to increase write performance). With this submission, all allocated memory is initialized before writing.

## Test environments

Expand All @@ -25,17 +25,12 @@ The install size on different platforms varies significantly, from 1.42 MB (wind

## Valgrind

The following warnings are generated with valgrind when tests are run:

* Syscall param write(buf) points to uninitialised byte(s)
* Conditional jump or move depends on uninitialised value(s)
* Syscall param writev(vector[...]) points to uninitialised byte(s)

Like in previous fst versions, all warnings are generated in source file 'src/fstcore/interface/fststore.cpp' and are caused by writing uninitialised data to file. This is done intentionally (to increase performance) and the specific on-disk data is overwritten at a later point with initialised values.
To reproduce the CRAN valgrind report, an instrumented (level 2) build of R was constructed on a fresh Ubuntu 16.04 image using config.site and configure parameters as specified in the memtests README file on CRAN. That build shows no valgrind warnings using the current submision.

## Downstream dependencies

I have run R CMD check on downstream dependencies and found no issues:

* heims: runs without warnings or errors.
* rio: runs without warnings or errors.
* grattan: runs without warnings or errors.
15 changes: 13 additions & 2 deletions src/fstcore/blockstreamer/blockstreamer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ void fdsStreamUncompressed_v2(ofstream& myfile, char* vec, unsigned long long ve
}


// header structure
//
// 4 | unsigned int | maximum compressed size of block
// 4 | unsigned int | number of elements in block


// Method for writing column data of any type to a stream.
void fdsStreamcompressed_v2(ofstream& myfile, char* colVec, unsigned long long nrOfRows, int elementSize,
StreamCompressor* streamCompressor, int blockSizeElems, std::string annotation, bool hasAnnotation)
Expand Down Expand Up @@ -225,9 +231,12 @@ void fdsStreamcompressed_v2(ofstream& myfile, char* colVec, unsigned long long n

// Blocks meta information
// Aligned at 8 byte boundary
std::unique_ptr<char[]> blockIndexP(new char[(2 + nrOfBlocks) * 8]);
unsigned int block_index_size = (2 + nrOfBlocks) * 8;
std::unique_ptr<char[]> blockIndexP(new char[block_index_size]);
char* blockIndex = blockIndexP.get(); // 1 long file pointer with 2 highest bytes indicating algorithmID

memset(blockIndex, 0, block_index_size);

unsigned int* maxCompSize = reinterpret_cast<unsigned int*>(&blockIndex[0]); // maximum uncompressed block length
unsigned int* blockSizeElements = reinterpret_cast<unsigned int*>(&blockIndex[4]); // number of elements per block

Expand Down Expand Up @@ -259,6 +268,8 @@ void fdsStreamcompressed_v2(ofstream& myfile, char* colVec, unsigned long long n
std::unique_ptr<char[]> threadBufferP(new char[nrOfThreads * MAX_COMPRESSBOUND * batchSize]);
char* threadBuffer = threadBufferP.get();

// TODO: possibly memset to zero to avoid valgrind warnings

int nrOfBatches = nrOfBlocks / batchSize; // number of complete batches with complete blocks

if (nrOfBatches > 0)
Expand Down Expand Up @@ -352,7 +363,7 @@ void fdsStreamcompressed_v2(ofstream& myfile, char* colVec, unsigned long long n
// Might be usefull in future implementation
*maxCompSize = maxCompressionSize;

// Write last block position
// Write last block position, note that nrOfBlocks is previously decreased by 1
blockPosition = reinterpret_cast<unsigned long long*>(&blockIndex[COL_META_SIZE + 8 + nrOfBlocks * 8]);
*blockPosition = blockIndexPos;

Expand Down
11 changes: 9 additions & 2 deletions src/fstcore/character/character_v6.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <fstream>
#include <memory>
#include <cstring> // memset


// #include <boost/unordered_map.hpp>
Expand Down Expand Up @@ -114,6 +115,9 @@ void fdsWriteCharVec_v6(ofstream& myfile, IStringWriter* stringWriter, int compr
std::unique_ptr<char[]> metaP(new char[metaSize]);
char* meta = metaP.get();

// clear memory for safety
memset(meta, 0, metaSize);

// Set column header
unsigned int* isCompressed = reinterpret_cast<unsigned int*>(meta);
unsigned int* blockSizeChar = reinterpret_cast<unsigned int*>(&meta[4]);
Expand Down Expand Up @@ -151,6 +155,9 @@ void fdsWriteCharVec_v6(ofstream& myfile, IStringWriter* stringWriter, int compr
std::unique_ptr<char[]> metaP(new char[metaSize]);
char* meta = metaP.get();

// clear memory for safety
memset(meta, 0, metaSize);

// Set column header
unsigned int* isCompressed = reinterpret_cast<unsigned int*>(meta);
unsigned int* blockSizeChar = reinterpret_cast<unsigned int*>(&meta[4]);
Expand Down Expand Up @@ -204,7 +211,7 @@ void fdsWriteCharVec_v6(ofstream& myfile, IStringWriter* stringWriter, int compr

stringWriter->SetBuffersFromVec(block * BLOCKSIZE_CHAR, (block + 1) * BLOCKSIZE_CHAR);
unsigned long long totSize = storeCharBlockCompressed_v6(myfile, stringWriter, block * BLOCKSIZE_CHAR,
(block + 1) * BLOCKSIZE_CHAR, streamCompressInt, streamCompressChar, *algoInt, *algoChar, *intBufSize, block);
(block + 1) * BLOCKSIZE_CHAR, streamCompressInt, streamCompressChar, *algoInt, *algoChar, *intBufSize, block);

fullSize += totSize;
*blockPos = fullSize;
Expand All @@ -218,7 +225,7 @@ void fdsWriteCharVec_v6(ofstream& myfile, IStringWriter* stringWriter, int compr

stringWriter->SetBuffersFromVec(nrOfBlocks * BLOCKSIZE_CHAR, vecLength);
unsigned long long totSize = storeCharBlockCompressed_v6(myfile, stringWriter, nrOfBlocks * BLOCKSIZE_CHAR,
vecLength, streamCompressInt, streamCompressChar, *algoInt, *algoChar, *intBufSize, nrOfBlocks);
vecLength, streamCompressInt, streamCompressChar, *algoInt, *algoChar, *intBufSize, nrOfBlocks);

fullSize += totSize;
*blockPos = fullSize;
Expand Down
20 changes: 13 additions & 7 deletions src/fstcore/compression/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,14 @@ void LogicDecompr64(char* logicalVec, const unsigned long long* compBuf, int nrO
}


// Compression buffer should be at least 1 + (nrOfLogicals - 1) / 256 elements in length (factor 32)
// Compression buffer should be at least 1 + (nrOfLogicals - 1) / 256 elements (long ints) in length (factor 32)
void LogicCompr64(const char* logicalVec, unsigned long long* compress, int nrOfLogicals)
{
const unsigned long long* logicals = (const unsigned long long*) logicalVec;
int nrOfLongs = nrOfLogicals / 32;
int nrOfLongs = nrOfLogicals / 32; // number of full longs

// Define filters
// TODO: define these as constants
unsigned long long BIT = (1LL << 32) | 1LL;
unsigned long long BIT0 = (BIT << 16) | (BIT << 15);
unsigned long long BIT1 = (BIT << 17) | (BIT << 14);
Expand Down Expand Up @@ -710,15 +711,20 @@ void LogicCompr64(const char* logicalVec, unsigned long long* compress, int nrOf


// Process remainder
int remain = nrOfLogicals % 32;
int remain = nrOfLogicals % 32; // nr of logicals remaining
if (remain == 0) return;

unsigned long long remainLongs[16]; // at maximum nrOfRemainLongs equals 16
int* remain_ints = reinterpret_cast<int*>(remainLongs);

// Compress the remainder in identical manner as the blocks here (for random access) !!!!!!
logics = &logicals[16 * nrOfLongs];

int nrOfRemainLongs = 1 + (remain - 1) / 2; // per 2 logicals
unsigned long long remainLongs[16]; // at maximum nrOfRemainLongs equals 16
const int nrOfRemainLongs = 1 + (remain - 1) / 2; // per 2 logicals

// please valgrind: only use initialized bytes for calculations
memcpy(remainLongs, logics, remain * sizeof(int));
memset(&remain_ints[remain], 0, (32 - remain) * 4); // clear remaining ints

unsigned long long compRes = 0;
for (int remainNr = 0; remainNr < nrOfRemainLongs; ++remainNr)
Expand Down Expand Up @@ -1030,8 +1036,8 @@ unsigned int ZSTD_INT_TO_SHORT_SHUF2_C(char* dst, unsigned int dstCapacity, cons

unsigned int ZSTD_INT_TO_SHORT_SHUF2_D(char* dst, unsigned int dstCapacity, const char* src, unsigned int compressedSize)
{
int nrOfLongs = 1 + (dstCapacity - 1) / 16; // srcSize is processed in blocks of 32 bytes
int nrOfDstInts = dstCapacity / 4;
unsigned int nrOfLongs = 1 + (dstCapacity - 1) / 16; // srcSize is processed in blocks of 32 bytes
unsigned int nrOfDstInts = dstCapacity / 4;

// Compress buffer
char buf[MAX_SIZE_COMPRESS_BLOCK_HALF];
Expand Down
8 changes: 8 additions & 0 deletions src/fstcore/interface/fststore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ void FstStore::fstWrite(IFstTable &fstTable, const int compress) const
// size of fst file header
const unsigned long long metaDataSize = tableHeaderSize + keyIndexHeaderSize + chunksetHeaderSize + colNamesHeaderSize;
char * metaDataWriteBlock = new char[metaDataSize]; // fst metadata

// clear memory for safety (avoids valgrind warnings)
memset(metaDataWriteBlock, 0, metaDataSize);

std::unique_ptr<char[]> metaDataPtr = std::unique_ptr<char[]>(metaDataWriteBlock);


Expand Down Expand Up @@ -375,6 +379,10 @@ void FstStore::fstWrite(IFstTable &fstTable, const int compress) const
// Size of chunkset index header plus data chunk header
const unsigned long long chunkIndexSize = CHUNK_INDEX_SIZE + DATA_INDEX_SIZE + 8 * nrOfCols;
char* chunkIndex = new char[chunkIndexSize];

// clear memory for safety
memset(chunkIndex, 0, chunkIndexSize);

std::unique_ptr<char[]> chunkIndexPtr = std::unique_ptr<char[]>(chunkIndex);

// Chunkset data index [node D, leaf of C] [size: 96]
Expand Down
16 changes: 2 additions & 14 deletions src/fstcore/logical/logical_v10.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,7 @@ using namespace std;
void fdsWriteLogicalVec_v10(ofstream &myfile, int* boolVector, unsigned long long nrOfLogicals, int compression,
std::string annotation, bool hasAnnotation)
{
// TODO: create multi-threaded code for a fixed ratio compressor

//if (compression == 0)
//{
// FixedRatioCompressor* compressor = new FixedRatioCompressor(CompAlgo::LOGIC64); // compression level not relevant here
// fdsStreamUncompressed_v2(myfile, (char*) boolVector, nrOfLogicals, 4, BLOCKSIZE_LOGICAL, compressor, annotation, hasAnnotation);

// delete compressor;

// return;
//}

int blockSize = 4 * BLOCKSIZE_LOGICAL; // block size in bytes
const int blockSize = 4 * BLOCKSIZE_LOGICAL; // block size in bytes

if (compression <= 50) // compress 1 - 50
{
Expand All @@ -57,7 +45,7 @@ void fdsWriteLogicalVec_v10(ofstream &myfile, int* boolVector, unsigned long lon
StreamCompressor* streamCompressor = new StreamCompositeCompressor(defaultCompress, compress2, 2 * compression);
streamCompressor->CompressBufferSize(blockSize);

fdsStreamcompressed_v2(myfile, (char*) boolVector, nrOfLogicals, 4, streamCompressor, BLOCKSIZE_LOGICAL, annotation, hasAnnotation);
fdsStreamcompressed_v2(myfile, reinterpret_cast<char*>(boolVector), nrOfLogicals, 4, streamCompressor, BLOCKSIZE_LOGICAL, annotation, hasAnnotation);

delete defaultCompress;
delete compress2;
Expand Down
4 changes: 4 additions & 0 deletions tests/testthat/test_lintr.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ library(lintr)
# * RcppExports not excluded

test_that("Package Style", {

# lintr throws a lot of valgrind warnings, so skip on CRAN for now
skip_on_cran()

lints <- with_defaults(line_length_linter = line_length_linter(120))
lints <- lints[!(names(lints) %in%
c("object_usage_linter", "camel_case_linter", "commas_linter", "multiple_dots_linter"))]
Expand Down

0 comments on commit 5b83d5e

Please sign in to comment.