Skip to content

Commit

Permalink
Access data using an iterator to stream records (#11)
Browse files Browse the repository at this point in the history
Thanks @smalot for the contribution!
  • Loading branch information
smalot authored Oct 1, 2020
1 parent d3cdb97 commit 257a9ab
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 23 deletions.
81 changes: 58 additions & 23 deletions lib/avro/data_file.php
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ private function read_header()
* @return array of data from object container.
* @throws AvroDataIOException
* @throws AvroIOException
* @internal Would be nice to implement data() as an iterator, I think
*/
public function data()
{
Expand All @@ -309,34 +308,70 @@ public function data()
if ($this->is_eof())
break;

$length = $this->read_block_header();
$decoder = $this->decoder;
if ($this->codec == AvroDataIO::DEFLATE_CODEC) {
if (!function_exists('gzinflate')) {
throw new AvroDataIOException('"gzinflate" function not available, "zlib" extension required.');
}
$compressed = $decoder->read($length);
$datum = gzinflate($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($this->codec == AvroDataIO::SNAPPY_CODEC) {
if (!function_exists('snappy_uncompress')) {
throw new AvroDataIOException('"snappy_uncompress" function not available, "snappy" extension required.');
}
$compressed = $decoder->read($length-4);
$datum = snappy_uncompress($compressed);
$crc32 = unpack('N', $decoder->read(4));
if ($crc32[1] != crc32($datum)) {
throw new AvroDataIOException('Invalid CRC32 checksum.');
}
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
}
$decoder = $this->apply_codec($this->decoder, $this->codec);
}
$data []= $this->datum_reader->read($decoder);
$data[] = $this->datum_reader->read($decoder);
$this->block_count -= 1;
}
return $data;
}

/**
* @throws AvroDataIOException
* @throws AvroIOException
*/
public function data_iterator()
{
while (true)
{
if (0 == $this->block_count)
{
if ($this->is_eof())
break;

if ($this->skip_sync())
if ($this->is_eof())
break;

$decoder = $this->apply_codec($this->decoder, $this->codec);
}
yield $this->datum_reader->read($decoder);
$this->block_count -= 1;
}
}

/**
* @param AvroIOBinaryDecoder $decoder
* @param string $codec
* @return AvroIOBinaryDecoder
* @throws AvroDataIOException
* @throws AvroIOException
*/
protected function apply_codec($decoder, $codec)
{
$length = $this->read_block_header();
if ($codec == AvroDataIO::DEFLATE_CODEC) {
if (!function_exists('gzinflate')) {
throw new AvroDataIOException('"gzinflate" function not available, "zlib" extension required.');
}
$compressed = $decoder->read($length);
$datum = gzinflate($compressed);
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
} elseif ($codec == AvroDataIO::SNAPPY_CODEC) {
if (!function_exists('snappy_uncompress')) {
throw new AvroDataIOException('"snappy_uncompress" function not available, "snappy" extension required.');
}
$compressed = $decoder->read($length-4);
$datum = snappy_uncompress($compressed);
$crc32 = unpack('N', $decoder->read(4));
if ($crc32[1] != crc32($datum)) {
throw new AvroDataIOException('Invalid CRC32 checksum.');
}
$decoder = new AvroIOBinaryDecoder(new AvroStringIO($datum));
}
return $decoder;
}

/**
* Closes this writer (and its AvroIO object.)
* @uses AvroIO::close()
Expand Down
16 changes: 16 additions & 0 deletions test/FileIOTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ private function read()
return $reader->data();
}

private function readIterator()
{
$fileName = $this->getFileName();
$reader = AvroDataIO::open_file($fileName);
$data = [];
foreach ($reader->data_iterator() as $row) {
$data[] = $row;
}
return $data;
}

public function testReading()
{
$expected = [
Expand All @@ -62,7 +73,12 @@ public function testReading()
'favorite_numbers' => [],
]
];

// Classic loading.
$this->assertEquals($expected, $this->read());

// Iterator loading.
$this->assertEquals($expected, $this->readIterator());
}

/**
Expand Down

0 comments on commit 257a9ab

Please sign in to comment.