Skip to content

Commit

Permalink
Merge pull request #25 from jacanchaplais/feature/lhe-read-24
Browse files Browse the repository at this point in the history
Added interfaces to iterative read LHE event data from file #24
  • Loading branch information
jacanchaplais authored Jun 3, 2024
2 parents 06ecb3c + 8397418 commit 1ad071a
Showing 1 changed file with 129 additions and 12 deletions.
141 changes: 129 additions & 12 deletions showerpipe/lhe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,37 @@
redistribute and repeat hard events, outputting valid lhe files.
"""

from typing import Union, Iterator, Optional, Callable
import dataclasses as dc
import gzip
import io
import itertools as it
import re
import gzip
import shutil
from contextlib import contextmanager, ExitStack
import tempfile
from pathlib import Path
from contextlib import ExitStack, contextmanager
from copy import deepcopy
from itertools import chain
from functools import cached_property
from xml.sax.saxutils import unescape
from pathlib import Path
from typing import Callable, Iterator, Optional, Union
from urllib.parse import urlparse
from urllib.request import urlopen
from xml.sax.saxutils import unescape

import numpy as np
import numpy.typing as npt
from lxml import etree # type: ignore
from lxml.etree import ElementBase


__all__ = ["source_adapter", "load_lhe", "count_events", "split", "LheData"]
__all__ = [
"source_adapter",
"load_lhe",
"count_events",
"split",
"LheData",
"LheData",
"event_iter_text",
"event_iter",
]

_LHE_STORAGE = Union[Path, str, bytes]

Expand Down Expand Up @@ -68,9 +79,9 @@ def source_adapter(source: _LHE_STORAGE) -> Iterator[io.BufferedIOBase]:
if is_path:
path = Path(source) # type: ignore
try:
with io.open(path, "r") as lhe_filecheck:
with open(path) as lhe_filecheck:
lhe_filecheck.read(1)
out_io = io.open(path, "rb")
out_io = open(path, "rb")
yield out_io
except UnicodeDecodeError:
out_io = gzip.open(path, "rb")
Expand Down Expand Up @@ -122,7 +133,7 @@ def _get_mg_info(header_element: ElementBase) -> ElementBase:


def _read_num_events(mg_info: ElementBase) -> int:
re_num = re.findall("\d+", mg_info.text)
re_num = re.findall(r"\d+", mg_info.text)
num_events = int(re_num[0])
return num_events

Expand Down Expand Up @@ -388,7 +399,7 @@ def _event_duplicator(
dup_strat: Callable[[Iterator[ElementBase]], Iterator[ElementBase]],
) -> Optional[bytes]:
tiled_lists = (self._event_iter for _ in range(repeats))
dup_events = chain.from_iterable(dup_strat(tiled_lists))
dup_events = it.chain.from_iterable(dup_strat(tiled_lists))
dup_event_copies = map(deepcopy, dup_events)
root = self._build_root(dup_event_copies)
header = root[0]
Expand All @@ -403,3 +414,109 @@ def _event_duplicator(
return None
else:
return _root_to_bytes(root)


@dc.dataclass
class LheEvent:
"""Data structure holding Les Houches event data.
:group: LesHouches
.. versionadded:: 0.4.0
Attributes
----------
pdg : ndarray[int32]
PDG / MCPID identification codes for particle species.
pmu : ndarray[float64]
Four-momenta of particles, in (px, py, pz, E) order.
status : ndarray[int16]
Codes referring to the role of particles in the event, **eg.**
beam particle, incoming, outgoing, **etc.**
helicity : ndarray[int16]
Spin polarity of the particles.
color : ndarray[int32]
Color / anti-color code pairs of the particles.
"""

pdg: npt.NDArray[np.int32]
pmu: npt.NDArray[np.float64]
status: npt.NDArray[np.int16]
helicity: npt.NDArray[np.int16]
color: npt.NDArray[np.int32]


def _event_text_parse(event_text: str) -> LheEvent:
schema = {
"pdg": {"idx": 0, "dtype": np.int32},
"status": {"idx": 1, "dtype": np.int16},
"color": {"idx": slice(4, 6), "dtype": np.int32},
"pmu": {"idx": slice(6, 10), "dtype": np.float64},
"helicity": {"idx": 12, "dtype": np.int16},
}
lines = event_text.strip().split("\n")[1:]
data = np.loadtxt(iter(lines), dtype=np.float64)
return LheEvent(
**{
name: data[:, meta["idx"]].astype(meta["dtype"])
for name, meta in schema.items()
}
)


def event_iter_text(source: _LHE_STORAGE) -> Iterator[str]:
"""Iterates over a LHE file, yielding the text contained within an
event block.
See https://arxiv.org/abs/hep-ph/0109068 for the data layout.
:group: LesHouches
.. versionadded:: 0.4.0
Parameters
----------
source : Pathlike, string, or bytes
The variable or filepath containing the LHE data. May be a path,
url, string, or bytes object. Gzip compression is allowed.
Yields
------
str
The text block contained in each successive event.
"""
with source_adapter(source) as xml_source:
event_parser = etree.iterparse(
source=xml_source, tag=("event",), **_parse_kwargs
)
for _, event in event_parser:
yield event.text.strip()


def event_iter(source: _LHE_STORAGE) -> Iterator[LheEvent]:
"""Iterates over a LHE file, yielding ``LheEvent`` objects,
providing attribute access to numpy arrays, containing particle
data.
:group: LesHouches
.. versionadded:: 0.4.0
Parameters
----------
source : Pathlike, string, or bytes
The variable or filepath containing the LHE data. May be a path,
url, string, or bytes object. Gzip compression is allowed.
Yields
------
LheEvent
Numpy interface to the event data.
Notes
-----
This is a wrapper around ``event_iter_text()``, but provides only a
subset of the fields available as attributes. For more complete data
you may wish to use ``event_iter_text()`` and parse the strings
yourself.
"""
yield from map(_event_text_parse, event_iter_text(source))

0 comments on commit 1ad071a

Please sign in to comment.