Skip to content

Commit

Permalink
Merge pull request #165 from cta-observatory/zcat
Browse files Browse the repository at this point in the history
Implement using of zcat for gzip files
  • Loading branch information
maxnoe authored Apr 29, 2019
2 parents 1400a26 + ee40dc8 commit 5282bbf
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 20 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ before_script:
- source ./download_test_files.sh

script:
- ls -lh tests/resources
- python setup.py test
- python setup.py check -r -s

Expand Down
63 changes: 61 additions & 2 deletions eventio/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import struct
import gzip
import logging
import subprocess as sp

from .file_types import is_gzip, is_eventio, is_zstd
from .header import parse_header_bytes, get_bits_from_word
Expand All @@ -18,18 +19,74 @@
KNOWN_OBJECTS = {}


class PipeWrapper:
'''
This class makes a sp.PIPE forward-seekable
by keeping track of the bytes already read
and reading as many bytes as required in `seek`
'''
def __init__(self, pipe):
self.pipe = pipe
self.pos = 0

def read(self, size=-1):
data = self.pipe.read(size)
self.pos += len(data)
return data

def seek(self, offset, whence=0):
if whence == 0:
to_read = offset - self.pos
if to_read < 0:
raise IOError('Only forward seeking possible')
self.pos += len(self.pipe.read(to_read))

if whence == 1:
if offset < 0:
raise IOError('Only forward seeking possible')
self.pos += len(self.pipe.read(offset))

if whence == 2:
raise IOError('Only forward seeking possible')

return self.pos

def close(self):
return self.pipe.close()


class EventIOFile:

def __init__(self, path):
def __init__(self, path, zcat=True):
log.info('Opening new file {}'.format(path))
self.path = path
self.read_process = None

if not is_eventio(path):
raise ValueError('File {} is not an eventio file'.format(path))

if is_gzip(path):
log.info('Found gzipped file')
self._filehandle = gzip.open(path, mode='rb')
if zcat:
try:
log.info('Trying to read using zcat')
self.read_process = sp.Popen(
['gzip', '-cd', path], stdout=sp.PIPE, stderr=sp.PIPE
)
self.read_process.poll()
rc = self.read_process.returncode
if rc is not None and rc != 0:
raise ValueError(self.read_process.stderr.read().decode())

self._filehandle = PipeWrapper(self.read_process.stdout)
log.info('Using zcat')
except Exception as e:
log.info(str(e))
log.warning('Falling back to gzip module')
self._filehandle = gzip.open(path)
else:
log.info('Using gzip module')
self._filehandle = gzip.open(path)

elif is_zstd(path):
log.info('Found zstd compressed file')
Expand Down Expand Up @@ -83,6 +140,8 @@ def __exit__(self, exc_type, exc_value, traceback):

def close(self):
self._filehandle.close()
if self.read_process is not None:
self.read_process.terminate()


def check_size_or_raise(data, expected_length, zero_ok=True):
Expand Down
4 changes: 2 additions & 2 deletions eventio/iact/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ class IACTFile(EventIOFile):
RunEnd
'''

def __init__(self, path):
super().__init__(path)
def __init__(self, path, zcat=True):
super().__init__(path, zcat=zcat)

header_object = next(self)
check_type(header_object, RunHeader)
Expand Down
4 changes: 2 additions & 2 deletions eventio/simtel/simtelfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class NoTrackingPositions(Exception):


class SimTelFile(EventIOFile):
def __init__(self, path, allowed_telescopes=None, skip_calibration=False):
super().__init__(path)
def __init__(self, path, allowed_telescopes=None, skip_calibration=False, zcat=True):
super().__init__(path, zcat=zcat)

self.path = path
self.allowed_telescopes = None
Expand Down
11 changes: 7 additions & 4 deletions eventio/var_int.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ cpdef uint8_t get_length_of_varint(const uint8_t first_byte):
@cython.boundscheck(False)
@cython.wraparound(False) # disable negative indexing
cpdef uint64_t parse_varint(const uint8_t[:] var_int_bytes):
length = var_int_bytes.shape[0]
cdef uint8_t length = var_int_bytes.shape[0]
cdef uint64_t v[9]
cdef uint8_t i = 0
for i in range(length):
Expand Down Expand Up @@ -317,7 +317,9 @@ def simtel_pixel_timing_parse_list_type_2(
bint glob_only_selected,
float granularity,
):
cdef uint32_t start, stop, list_index
cdef int16_t start, stop
cdef uint32_t list_index
cdef uint32_t n_lists = pixel_list.shape[0]
cdef uint32_t i_pix, i_type
cdef uint64_t pos = 0
cdef uint32_t length = 0
Expand All @@ -326,7 +328,7 @@ def simtel_pixel_timing_parse_list_type_2(
cdef np.ndarray[int32_t, ndim=2] pulse_sum_loc = np.zeros((n_gains, n_pixels), dtype=INT32)
cdef np.ndarray[int32_t, ndim=2] pulse_sum_glob = np.zeros((n_gains, n_pixels), dtype=INT32)

for list_index in range(pixel_list.shape[0]):
for list_index in range(n_lists):
start = pixel_list[list_index][0]
stop = pixel_list[list_index][1]

Expand Down Expand Up @@ -379,7 +381,8 @@ def parse_1208(
amplitude = None

cdef uint64_t pos = 0
cdef uint32_t i, j
cdef uint32_t i
cdef int32_t j

for i in range(nonempty):
if version > 2:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def finalize_options(self):

setup(
name='eventio',
version='0.19.1',
version='0.20.0',
description='Python read-only implementation of the EventIO file format',
long_description=long_description,
url='https://github.com/fact-project/pyeventio',
Expand Down
11 changes: 5 additions & 6 deletions tests/test_histograms.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from eventio.search_utils import collect_toplevel_of_type
from eventio.search_utils import yield_toplevel_of_type


prod4b_sst1m_file = 'tests/resources/gamma_20deg_0deg_run102___cta-prod4-sst-1m_desert-2150m-Paranal-sst-1m.simtel.gz'
Expand All @@ -22,15 +22,14 @@ def test_histograms():

with EventIOFile(prod4b_sst1m_file) as f:

objects = collect_toplevel_of_type(f, Histograms)
assert len(objects) > 0

for obj in objects:
n_read = 0
for obj in yield_toplevel_of_type(f, Histograms):
hists = obj.parse()
unread = obj.read()
assert len(unread) == 0 or all(b == 0 for b in unread)
n_read += 1

for hist, title in zip(hists, titles):
assert hist['title'] == title


assert n_read == 1
16 changes: 13 additions & 3 deletions tests/test_open_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,19 @@ def test_file_has_correct_types():
assert types == [1200, 1212, 1201, 1202, 1203, 1204, 1209, 1210]


def test_types_gzipped():
testfile = 'tests/resources/one_shower.dat'
f = eventio.EventIOFile(testfile)
def test_types_gzip():
testfile = 'tests/resources/one_shower.dat.gz'
f = eventio.EventIOFile(testfile, zcat=False)
types = [o.header.type for o in f]

assert types == [1200, 1212, 1201, 1202, 1203, 1204, 1209, 1210]


def test_types_zcat():
from eventio.base import PipeWrapper
testfile = 'tests/resources/one_shower.dat.gz'
f = eventio.EventIOFile(testfile)

assert isinstance(f._filehandle, PipeWrapper)
types = [o.header.type for o in f]
assert types == [1200, 1212, 1201, 1202, 1203, 1204, 1209, 1210]

0 comments on commit 5282bbf

Please sign in to comment.