diff --git a/requirements.txt b/requirements.txt index d7479d5..683f27b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,6 @@ # List required packages in this file, one per line. numpy numba -os pandas -scipy.spatial -concurrent.futures -multiprocessing -time -tdqm -gc -pyCHX +scipy +# pyCHX diff --git a/tpx3awkward/__init__.py b/tpx3awkward/__init__.py index 628ae79..c56bf1b 100644 --- a/tpx3awkward/__init__.py +++ b/tpx3awkward/__init__.py @@ -1,5 +1,6 @@ from pathlib import Path from . import _version +from ._utils import ingest_from_files def _get_version(): diff --git a/tpx3awkward/_utils.py b/tpx3awkward/_utils.py index ffdef7d..7f6498a 100644 --- a/tpx3awkward/_utils.py +++ b/tpx3awkward/_utils.py @@ -1,17 +1,15 @@ import os -import numpy as np from pathlib import Path +from typing import TypeVar, Union, Dict, List, Tuple +import numpy as np from numpy.typing import NDArray -from typing import TypeVar, Union, Dict, Set, List import numba import pandas as pd from scipy.spatial import KDTree -import concurrent.futures import multiprocessing -import time from tqdm import tqdm -from pyCHX.chx_packages import db, get_sid_filenames -import gc +import warnings + IA = NDArray[np.uint64] UnSigned = TypeVar("UnSigned", IA, np.uint64) @@ -27,21 +25,26 @@ def raw_as_numpy(fpath: Union[str, Path]) -> IA: ---------- """ - with open(fpath, "rb") as fin: - return np.frombuffer(fin.read(), dtype=" UnSigned: return v >> np.uint64(shift) & np.uint64(2**width - 1) -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) +def matches_nibble(data, nibble) -> numba.boolean: + return (int(data) >> 60) == nibble + + +@numba.jit(nopython=True, cache=True) def is_packet_header(v: UnSigned) -> UnSigned: + """Identify packet headers by magic number (TPX3 as ascii on lowest 8 bytes]""" return get_block(v, 32, 0) == 861425748 -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def classify_array(data: IA) -> NDArray[np.uint8]: """ Create an array the same size as the data classifying 64bit uint by type. @@ -55,7 +58,6 @@ def classify_array(data: IA) -> NDArray[np.uint8]: 6: frame driven data (id'd via 0xA upper nibble) (??) """ output = np.zeros_like(data, dtype=" NDArray[np.uint8]: return output -@numba.jit(nopython=True) +@numba.jit(nopython=True, cache=True) def _shift_xy(chip, row, col): # TODO sort out if this needs to be paremeterized out = np.zeros(2, "u4") @@ -92,153 +94,192 @@ def _shift_xy(chip, row, col): return out -@numba.jit(nopython=True) -def _ingest_raw_data(data: IA): - types = np.zeros_like(data, dtype="> np.uint(60) - # probably a better way to do this, but brute force! - types[~is_header & (nibble == 0xB)] = 2 - types[~is_header & (nibble == 0x6)] = 3 - types[~is_header & (nibble == 0x4)] = 4 - types[~is_header & (nibble == 0x7)] = 5 - - # sort out how many photons we have - total_photons = np.sum(types == 2) - - # allocate the return arrays - x = np.zeros(total_photons, dtype="u2") - y = np.zeros(total_photons, dtype="u2") - pix_addr = np.zeros(total_photons, dtype="u2") - ToA = np.zeros(total_photons, dtype="u2") - ToT = np.zeros(total_photons, dtype="u4") - FToA = np.zeros(total_photons, dtype="u2") - SPIDR = np.zeros(total_photons, dtype="u2") - chip_number = np.zeros(total_photons, dtype="u1") - basetime = np.zeros(total_photons, dtype="u8") - timestamp = np.zeros(total_photons, dtype="u8") - - photon_offset = 0 - chip = np.uint16(0) - expected_msg_count = np.uint16(0) - msg_run_count = np.uint(0) - - heartbeat_lsb = np.uint64(0) - heartbeat_msb = np.uint64(0) +@numba.jit(nopython=True, cache=True) +def decode_xy(msg, chip): + # these names and math are adapted from c++ code + l_pix_addr = (msg >> np.uint(44)) & np.uint(0xFFFF) + # This is laid out 16ibts which are 2 interleaved 8 bit unsigned ints + # CCCCCCCRRRRRRCRR + # |dcol ||spix|^|| + # | 7 || 6 |1|2 + # + # The high 7 bits of the column + # '1111111000000000' + dcol = (l_pix_addr & np.uint(0xFE00)) >> np.uint(8) + # The high 6 bits of the row + # '0000000111111000' + spix = (l_pix_addr & np.uint(0x01F8)) >> np.uint(1) + rowcol = _shift_xy( + chip, + # add the low 2 bits of the row + # '0000000000000011' + spix + (l_pix_addr & np.uint(0x3)), + # add the low 1 bit of the column + # '0000000000000100' + dcol + ((l_pix_addr & np.uint(0x4)) >> np.uint(2)), + ) + return rowcol[1], rowcol[0] + + +@numba.jit(nopython=True, cache=True) +def get_spidr(msg): + return msg & np.uint(0xFFFF) + + +@numba.jit(nopython=True, cache=True) +def decode_message(msg, chip, heartbeat_time: np.uint64 = 0): + """Decode TPX3 packages of the second type corresponding to photon events (id'd via 0xB upper nibble) + + Parameters + ---------- + msg (uint64): tpx3 binary message + chip (uint8): chip ID, 0..3 + heartbeat_time (uint64): + + # bit position : ... 44 40 36 32 28 24 20 16 12 8 7 4 3 0 + # 0xFFFFC0000000 : 1111 1111 1111 1111 1100 0000 0000 0000 0000 0000 0000 0000 + # 0x3FFFFFFF : 0000 0000 0000 0000 0011 1111 1111 1111 1111 1111 1111 1111 + # SPIDR : ssss ssss ssss ssss ssss + # ToA : tt tttt tttt + # ToA_coarse : ss ssss ssss ssss ssss sstt tttt tttt + # pixel_bits : ^^ + # FToA : ffff + # count : ss ssss ssss ssss ssss sstt tttt tttt ffff (FToA is subtracted) + # phase : pppp + # 0x10000000 : 1 0000 0000 0000 0000 0000 0000 0000 + # heartbeat_time : hhhh hhhh hhhh hhhh hhhh hhhh hhhh hhhh hhhh hhhh hhhh hhhh + # heartbeat_bits : ^^ + # global_time : hhhh hhhh hhhh hhss ssss ssss ssss ssss sstt tttt tttt ffff + + # count = (ToA_coarse << np.uint(4)) - FToA # Counter value, in multiples of 1.5625 ns + + Returns + ---------- + Arrays of pixel coordinates, ToT, and timestamps. + """ + msg, heartbeat_time = np.uint64(msg), np.uint64(heartbeat_time) # Force types + x, y = decode_xy(msg, chip) # or use x1, y1 = calculateXY(msg, chip) from the Vendor's code + # ToA is 14 bits + ToA = (msg >> np.uint(30)) & np.uint(0x3FFF) + # ToT is 10 bits; report in ns + ToT = ((msg >> np.uint(20)) & np.uint(0x3FF)) * 25 + # FToA is 4 bits + FToA = (msg >> np.uint(16)) & np.uint(0xF) + # SPIDR time is 16 bits + SPIDR = np.uint64(get_spidr(msg)) + + ToA_coarse = (SPIDR << np.uint(14)) | ToA + # pixel_bits are the two highest bits of the SPIDR (i.e. the pixelbits range covers 262143 spidr cycles) + pixel_bits = np.int8((ToA_coarse >> np.uint(28)) & np.uint(0x3)) + # heart_bits are the bits at the same positions in the heartbeat_time + heart_bits = np.int8((heartbeat_time >> np.uint(28)) & np.uint(0x3)) + # Adjust heartbeat_time based on the difference between heart_bits and pixel_bits + diff = heart_bits - pixel_bits + # diff +/-1 occur when pixelbits step up + # diff +/-3 occur when spidr counter overfills + # diff can also be 0 -- then nothing happens -- but never +/-2 + if (diff == 1 or diff == -3) and (heartbeat_time > np.uint(0x10000000)): + heartbeat_time -= np.uint(0x10000000) + elif diff == -1 or diff == 3: + heartbeat_time += np.uint(0x10000000) + # Construct globaltime + global_time = (heartbeat_time & np.uint(0xFFFFFFFC0000000)) | (ToA_coarse & np.uint(0x3FFFFFFF)) + # Phase correction + phase = np.uint((x / 2) % 16) or np.uint(16) + # Construct timestamp with phase correction + ts = (global_time << np.uint(4)) - FToA + phase + + return x, y, ToT, ts + + +@numba.jit(nopython=True, cache=True) +def _ingest_raw_data(data): + + chips = np.zeros_like(data, dtype=np.uint8) + x = np.zeros_like(data, dtype="u2") + y = np.zeros_like(data, dtype="u2") + tot = np.zeros_like(data, dtype="u4") + ts = np.zeros_like(data, dtype="u8") + heartbeat_lsb = None #np.uint64(0) heartbeat_time = np.uint64(0) - # loop over the packet headers (can not vectorize this with numpy) - for j in range(len(data)): - msg = data[j] - typ = types[j] - if typ == 1: - # 1: packet header (id'd via TPX3 magic number) + + photon_count, chip_indx, msg_run_count, expected_msg_count = 0, 0, 0, 0 + for msg in data: + if is_packet_header(msg): + # Type 1: packet header (id'd via TPX3 magic number) if expected_msg_count != msg_run_count: - print("missing messages!", msg) - # extract scalar information from the header + print("Missing messages!", msg) - # "number of pixels in chunk" is given in bytes not words - # and means all words in the chunk, not just "photons" + # extract the chip number for the following photon events + chip_indx = np.uint8(get_block(msg, 8, 32)) + + # "number of pixels in chunk" is given in bytes not words and means all words in the chunk, not just "photons" expected_msg_count = get_block(msg, 16, 48) // 8 - # what chip we are on - chip = np.uint8(get_block(msg, 8, 32)) msg_run_count = 0 - elif typ == 2 or typ == 6: - # 2: photon event (id'd via 0xB upper nibble) - # 6: frame driven data (id'd via 0xA upper nibble) (??) - - # | - - # pixAddr is 16 bits - # these names and math are adapted from c++ code - l_pix_addr = pix_addr[photon_offset] = (msg >> np.uint(44)) & np.uint(0xFFFF) - # This is laid out 16ibts which are 2 interleaved 8 bit unsigned ints - # CCCCCCCRRRRRRCRR - # |dcol ||spix|^|| - # | 7 || 6 |1|2 - # - # The high 7 bits of the column - # '1111111000000000' - dcol = (l_pix_addr & np.uint(0xFE00)) >> np.uint(8) - # The high 6 bits of the row - # '0000000111111000' - spix = (l_pix_addr & np.uint(0x01F8)) >> np.uint(1) - rowcol = _shift_xy( - chip, - # add the low 2 bits of the row - # '0000000000000011' - spix + (l_pix_addr & np.uint(0x3)), - # add the low 1 bit of the column - # '0000000000000100' - dcol + ((l_pix_addr & np.uint(0x4)) >> np.uint(2)), - ) - col = x[photon_offset] = rowcol[1] - y[photon_offset] = rowcol[0] - # ToA is 14 bits - ToA[photon_offset] = (msg >> np.uint(30)) & np.uint(0x3FFF) - # ToT is 10 bits - # report in ns - ToT[photon_offset] = ((msg >> np.uint(20)) & np.uint(0x3FF)) * 25 - # FToA is 4 bits - l_FToA = FToA[photon_offset] = (msg >> np.uint(16)) & np.uint(0xF) - # SPIDR time is 16 bits - SPIDR[photon_offset] = msg & np.uint(0xFFFF) - # chip number (this is a constant) - chip_number[photon_offset] = chip - # heartbeat time - basetime[photon_offset] = heartbeat_time - - ToA_coarse = (SPIDR[photon_offset] << np.uint(14)) | ToA[photon_offset] - pixelbits = int((ToA_coarse >> np.uint(28)) & np.uint(0x3)) - heartbeat_time_bits = int((heartbeat_time >> np.uint(28)) & np.uint(0x3)) - diff = heartbeat_time_bits - pixelbits - if diff == 1 or diff == -3: - heartbeat_time -= np.uint(0x10000000) - elif diff == -1 or diff == 3: - heartbeat_time += np.uint(0x10000000) - globaltime = (heartbeat_time & np.uint(0xFFFFC0000000)) | (ToA_coarse & np.uint(0x3FFFFFFF)) - - timestamp[photon_offset] = (globaltime << np.uint(12)) - (l_FToA << np.uint(8)) - # correct for phase shift - phase = np.uint((col / 2) % 16) - if phase == 0: - timestamp[photon_offset] += 16 << 8 - else: - timestamp[photon_offset] += phase << 8 - photon_offset += 1 + elif matches_nibble(msg, 0xB): + # Type 2: photon event (id'd via 0xB upper nibble) + chips[photon_count] = chip_indx + _x, _y, _tot, _ts = decode_message(msg, chip_indx, heartbeat_time=heartbeat_time) + x[photon_count] = _x + y[photon_count] = _y + tot[photon_count] = _tot + ts[photon_count] = _ts + + if (photon_count > 0) and (_ts > ts[photon_count-1]) and (_ts - ts[photon_count-1] > 2**30): + prev_ts = ts[:photon_count] # This portion needs to be adjusted + # Find what the current timestamp would be without global heartbeat + _, _, _, _ts_0 = decode_message(msg, chip_indx, heartbeat_time=np.uint64(0)) + # Check if there is a SPIDR rollover in the beginning of the file but before heartbeat was received + head_max = max(prev_ts[:10]) + tail_min = min(prev_ts[-10:]) + # Compare the difference with some big number (e.g. 1/4 of SPIDR) + if (head_max > tail_min) and (head_max - tail_min > 2**32): + prev_ts[prev_ts < 2**33] += np.uint64(2**34) + _ts_0 += np.uint64(2**34) + # Ajust already processed timestamps + ts[:photon_count] = prev_ts + (_ts - _ts_0) + + photon_count += 1 msg_run_count += 1 - elif typ == 3: - # 3: TDC timstamp (id'd via 0x6 upper nibble) + + elif matches_nibble(msg, 0x6): + # Type 3: TDC timstamp (id'd via 0x6 upper nibble) # TODO: handle these! msg_run_count += 1 - elif typ == 4: - # 4: global timestap (id'd via 0x4 upper nibble) - subheader = (msg >> np.uint(56)) & np.uint(0x0F) + + elif matches_nibble(msg, 0x4): + # Type 4: global timestap (id'd via 0x4 upper nibble) + subheader = (msg >> np.uint(56)) & np.uint64(0x0F) if subheader == 0x4: - # timer lsb, 32 bits of time - heartbeat_lsb = (msg >> np.uint(16)) & np.uint(0xFFFFFFFF) + # timer LSB, 32 bits of time + heartbeat_lsb = (msg >> np.uint(16)) & np.uint64(0xFFFFFFFF) elif subheader == 0x5: - # timer msb - - time_msg = (msg >> np.uint(16)) & np.uint(0xFFFF) - heartbeat_msb = time_msg << np.uint(32) - # TODO the c++ code has large jump detection, do not understand why - heartbeat_time = heartbeat_msb | heartbeat_lsb + # timer MSB -- only matters if LSB has been received already + if heartbeat_lsb is not None: + heartbeat_msb = ((msg >> np.uint(16)) & np.uint64(0xFFFF)) << np.uint(32) + heartbeat_time = (heartbeat_msb | heartbeat_lsb) + # TODO the c++ code has large jump detection, do not understand why else: - raise Exception("unknown header") + raise Exception(f"Unknown subheader {subheader} in the Global Timestamp message") + pass msg_run_count += 1 - elif typ == 5: - # 5: "command" data (id'd via 0x7 upper nibble) + + elif matches_nibble(msg, 0x7): + # Type 5: "command" data (id'd via 0x7 upper nibble) # TODO handle this! msg_run_count += 1 + else: - raise Exception("Not supported") + raise Exception(f"Not supported: {msg}") - return x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number, basetime, timestamp + + # Sort the timestamps + indx = np.argsort(ts[:photon_count], kind="mergesort") # is mergesort the best here? wondering if this could be optimized + x, y, tot, ts, chips = x[indx], y[indx], tot[indx], ts[indx], chips[indx] + + return x, y, tot, ts, chips def ingest_raw_data(data: IA) -> Dict[str, NDArray]: @@ -253,24 +294,18 @@ def ingest_raw_data(data: IA) -> Dict[str, NDArray]: Returns ------- Dict[str, NDArray] - Keys of x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number + Keys of x, y, ToT, chip_number """ return { k.strip(): v - for k, v in zip( - "x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number, basetime, timestamp".split(","), - _ingest_raw_data(data), - ) + for k, v in zip("x, y, ToT, t, chip".split(","), _ingest_raw_data(data)) } -# ^-- tom wrote -# v-- justin wrote + """ Some basic functions that help take the initial output of ingest_raw_data and finish the processing. """ - - -def raw_to_sorted_df(fpath: Union[str, Path]) -> pd.DataFrame: +def tpx_to_raw_df(fpath: Union[str, Path]) -> pd.DataFrame: """ Parses a .tpx3 file and returns the raw data after timesorting. @@ -285,28 +320,7 @@ def raw_to_sorted_df(fpath: Union[str, Path]) -> pd.DataFrame: DataFrame of raw events from the .tpx3 file. """ raw_df = pd.DataFrame(ingest_raw_data(raw_as_numpy(fpath))) - return raw_df.sort_values("timestamp").reset_index(drop=True) - - -def condense_raw_df(df: pd.DataFrame) -> pd.DataFrame: - """ - Condenses the raw dataframe with only key information necesary for the analysis. Returns a dataframe with timestamp (renamed to t), x, y, and ToT. - - Parameters - ---------- - df : pd.DataFrame - DataFrame generated using raw_to_sorted_df(). - - Returns - ------- - pd.DataFrame - Dataframe condensed to only contain pertinent information for analysis. - """ - cdf = df[["timestamp", "x", "y", "ToT"]] - cdf = cdf.rename( - columns={"timestamp": "t"} - ) # obviously not necessary, just easier to type 't' a lot than 'timestamp' - return cdf + return raw_df.sort_values("t").reset_index(drop=True) # should we specify the sorting algorithm? at this point, it should be sorted anyway, but I think dataframes need to be explicitly sorted for use in e.g. merge_asof? def drop_zero_tot(df: pd.DataFrame) -> pd.DataFrame: @@ -323,26 +337,25 @@ def drop_zero_tot(df: pd.DataFrame) -> pd.DataFrame: pd.DataFrame df with only the events with ToT > 0 """ - fdf = df[df["ToT"] > 0] - return fdf + return df[df["ToT"] > 0] """ Functions to help perform clustering and centroiding on raw data. """ -TIMESTAMP_VALUE = ((1e-9) / 4096) * 25 -MICROSECOND = 10 ** (-6) +TIMESTAMP_VALUE = 1.5625*1e-9 # each raw timestamp is 1.5625 seconds +MICROSECOND = 1e-6 # We have had decent success with these values, but do not know for sure if they are optimal. -DEFAULT_CLUSTER_RADIUS = 2 -DEFAULT_CLUSTER_TW_MICROSECONDS = 0.5 +DEFAULT_CLUSTER_RADIUS = 3 +DEFAULT_CLUSTER_TW_MICROSECONDS = 0.3 DEFAULT_CLUSTER_TW = int(DEFAULT_CLUSTER_TW_MICROSECONDS * MICROSECOND / TIMESTAMP_VALUE) -def neighbor_set_from_df( +def cluster_df( df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS -) -> tuple[np.ndarray, Set[tuple[int]]]: +) -> tuple[np.ndarray, np.ndarray]: """ Uses scipy.spatial's KDTree to cluster raw input data. Requires a time window for clustering adjacent pixels and the total search radius. @@ -363,90 +376,41 @@ def neighbor_set_from_df( An set of tuples of the indices of the clustered events. The outer set is each cluster, and the inner tuples are the events in each cluster. """ events = np.array( - df[["t", "x", "y", "ToT", "t"]].values + df[["t", "x", "y", "ToT", "t"]].values # raw data stored in dataframe. duplicate timestamp column as the first instance is windowed ) # first three columns are for search radius of KDTree events[:, 0] = np.floor_divide(events[:, 0], tw) # bin by the time window - tree = KDTree(events[:, :3]) # generate KDTree based off the coordinates + tree = KDTree(events[:, :3]) # generate KDTree based off the coordinates (t/timewindow, x, y) neighbors = tree.query_ball_tree( tree, radius ) # compare tree against itself to find neighbors within the search radius - clusters = set(tuple(n) for n in neighbors) # turn the list of lists into a set of tuples - return events, clusters - - -def cluster_stats( - clusters: Set[tuple[int]] -) -> tuple[int]: - """ - Determines basic information about cluster information, such as the number of clusters and size of the largest cluster. - - Parameters - ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - - Returns - ------- - int - The total number of clusters - int - The number of events in the largest cluster - """ - num_clusters = len(clusters) - max_cluster = max(map(len, clusters)) - return num_clusters, max_cluster - + if len(neighbors) >= 2147483647: # performance is marginally better if can use int32 for the indices, so check for that + dtype = np.int64 + else: + dtype = np.int32 + return pd.DataFrame(neighbors).fillna(-1).astype(dtype).drop_duplicates().values, events[:, 1:] # a bit weird, but faster than using sets and list operators to unpack neighbors -def create_cluster_arr( - clusters: Set[tuple[int]], num_clusters: int, max_cluster: int -) -> np.ndarray: # is there a better way to do this? - """ - Converts the clusters from a set of tuples of indices to an 2D numpy array format which can be efficiently iterated through with Numba. - Parameters - ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest cluster - - Returns - ------- - np.ndarray - The cluster data now in a 2D numpy array. - """ - cluster_arr = np.full( - (num_clusters, max_cluster), -1, dtype=np.int64 - ) # fill with -1; these will be passed later - for cluster_num, cluster in enumerate(clusters): - for event_num, event in enumerate(cluster): - cluster_arr[cluster_num, event_num] = event - return cluster_arr - - -@numba.jit(nopython=True) -def cluster_arr_to_cent( - cluster_arr: np.ndarray, events: np.ndarray, num_clusters: int, max_cluster: int -) -> tuple[np.ndarray]: +@numba.jit(nopython=True, cache=True) +def centroid_clusters( + cluster_arr: np.ndarray, events: np.ndarray +) -> tuple[np.ndarray]: """ Performs the centroiding of a group of clusters using Numba. Note I originally attempted to unpack the clusters using list comprehensions, but this approach is significantly faster. Parameters ---------- - clusters : Set[tuple[int]] - The set of tuples of clusters from neighbor_set_from_df() - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest clust + clusters : nd.array + The numpy representation of the clusters' event indices. + events : nd.array + The numpy represetation of the event data. Returns ------- tuple[np.ndarray] t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. """ + num_clusters = cluster_arr.shape[0] + max_cluster = cluster_arr.shape[1] t = np.zeros(num_clusters, dtype="uint64") xc = np.zeros(num_clusters, dtype="float32") yc = np.zeros(num_clusters, dtype="float32") @@ -459,13 +423,13 @@ def cluster_arr_to_cent( for event_num in range(max_cluster): event = cluster_arr[cluster_id, event_num] if event > -1: # if we have an event here - if events[event, 3] > _ToT_max: # find the max ToT, assign, use that time - _ToT_max = events[event, 3] - t[cluster_id] = events[event, 4] + if events[event, 2] > _ToT_max: # find the max ToT, assign, use that time + _ToT_max = events[event, 2] + t[cluster_id] = events[event, 3] ToT_max[cluster_id] = _ToT_max - xc[cluster_id] += events[event, 1] * events[event, 3] # x and y centroids by time over threshold - yc[cluster_id] += events[event, 2] * events[event, 3] - ToT_sum[cluster_id] += events[event, 3] # calcuate sum + xc[cluster_id] += events[event, 0] * events[event, 2] # x and y centroids by time over threshold + yc[cluster_id] += events[event, 1] * events[event, 2] + ToT_sum[cluster_id] += events[event, 2] # calcuate sum n[cluster_id] += np.ubyte(1) # number of events in cluster else: break @@ -500,60 +464,15 @@ def ingest_cent_data( } -def cent_to_numpy( - cluster_arr: np.ndarray, events: int, num_clusters: int, max_cluster: int -) -> Dict[str, np.ndarray]: - """ - Wrapper function to perform ingest_cent_data(cluster_arr_to_cent()) - - Parameters - ---------- - cluster_arr : np.ndarray - The array of cluster events from create_cluster_arr() - events : int - Number of photon events - num_clusters : int - The total number of clusters - max_cluster : int - The number of events in the largest clust - - Returns - ------- - Dict[str, np.ndarray] - Keys of t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. - """ - return ingest_cent_data(cluster_arr_to_cent(cluster_arr, events, num_clusters, max_cluster)) - - -def cent_to_df( - cd_np: Dict[str, np.ndarray] -) -> pd.DataFrame: - """ - Returns the centroided dataframe from the zipped inputs. - - Parameters - ---------- - cd_np : Dict[str, np.ndarray] - Dictionary of the clustered data. - - Returns - ------- - pd.DataFrame - Time sorted dataframe of the centroids. - """ - cent_df = pd.DataFrame(cd_np) - return cent_df.sort_values("t").reset_index(drop=True) - - -def raw_df_to_cluster_df( - raw_df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS +def raw_df_to_cent_df( + df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS ) -> pd.DataFrame: """ Uses functions defined herein to take Dataframe of raw data and return dataframe of clustered data. Parameters ---------- - raw_df : pd.DataFrame + df : pd.DataFrame Pandas DataFrame of the raw data tw : int The time window to be considered "coincident" for clustering purposes @@ -565,11 +484,10 @@ def raw_df_to_cluster_df( pd.DataFrame Pandas DataFrame of the centroided data. """ - filt_cond_raw_df = drop_zero_tot(condense_raw_df(raw_df)) - events, clusters = neighbor_set_from_df(filt_cond_raw_df, tw, radius) - num_clusters, max_cluster = cluster_stats(clusters) - cluster_arr = create_cluster_arr(clusters, num_clusters, max_cluster) - return cent_to_df(cent_to_numpy(cluster_arr, events, num_clusters, max_cluster)) + fdf = drop_zero_tot(df) + cluster_arr, events = cluster_df(fdf, tw, radius) + data = centroid_clusters(cluster_arr, events) + return pd.DataFrame(ingest_cent_data(data)).sort_values("t").reset_index(drop=True) # should we specify the sort type here? this should be *almost* completely sorted already def add_centroid_cols( @@ -591,107 +509,31 @@ def add_centroid_cols( Originally dataframe with new columns x, y, and t_ns added. """ if gap: - df.loc[df['xc'] >= 255.5, 'xc'] += 2 - df.loc[df['yc'] >= 255.5, 'yc'] += 2 - df["x"] = np.round(df["xc"]).astype(np.uint16) + df.loc[df["xc"] >= 255.5, "xc"] += 2 + df.loc[df["yc"] >= 255.5, "yc"] += 2 + df["x"] = np.round(df["xc"]).astype(np.uint16) # sometimes you just want to know the closest pixel df["y"] = np.round(df["yc"]).astype(np.uint16) - df["t_ns"] = df["t"] / 4096 * 25 + df["t_ns"] = (df["t"].astype(np.float64) * 1.5625) # better way to convert to ns while maintaining precision? return df """ -A bunch of functions to help process multiple related .tpx3 files into Pandas dataframes stored in .h5 files. +Functions to help process multiple related .tpx3 files into Pandas dataframes stored in .h5 files. """ RAW_H5_SUFFIX = "" CENT_H5_SUFFIX = "_cent" CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid( - sid: int -) -> List[str]: - """ - Extract file paths from a given sid. - - Parameters - ---------- - sid : int - Short ID of a BlueSky scan - - Returns - ------- - List[str] - Filepaths of the written .tpx3, as recorded in Tiled - """ - return list(db[sid].table()["tpx3_files_raw_filepaths"].to_numpy()[0]) - - -def extract_uid_from_fpaths( - fpaths: List[str] -) -> str: - """ - Extract scan unique ID from file paths. - - Parameters - ---------- - fpaths : List[str] - List of the filepaths. - - Returns - ------- - str - String of the first file's unique ID. - - """ - return os.path.basename(fpaths[0])[:23] - - -def extract_dir_from_fpaths( - fpaths: List[str] -) -> str: - """ - Extract directory from file paths. - - Parameters - ---------- - fpaths : List[str] - List of the filepaths. - - Returns - ------- - str - String of the first file's directory. - - """ - return os.path.dirname(fpaths[0]) - - -def extract_uid_from_sid( - sid: int -) -> str: - """ - Extract user ID from a given sid. - - Parameters - ---------- - sid : int - - Returns - ------- - str - String of the short ID's corresponding unique ID. - - """ - return extract_uid_from_fpaths(extract_fpaths_from_sid(sid)) - - -def convert_file( - fpath: Union[str, Path], time_window_microsecond: float = DEFAULT_CLUSTER_TW_MICROSECONDS, radius: int = DEFAULT_CLUSTER_RADIUS, print_details: bool = False +def convert_tpx_file( + fpath: Union[str, Path], time_window_microsecond: float = DEFAULT_CLUSTER_TW_MICROSECONDS, radius: int = DEFAULT_CLUSTER_RADIUS, print_details: bool = False, overwrite: bool = True ): """ Convert a .tpx3 file into raw and centroided Pandas dataframes, which are stored in .h5 files. + TO DO: Args to specify output directory (default will be same directory as .tpx3 file as is now). + Parameters ---------- fpath : Union[str, Path] @@ -702,193 +544,122 @@ def convert_file( The radius, in pixels, to perform centroiding print_details : bool = False Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. + """ fname, ext = os.path.splitext(fpath) dfname = "{}{}.h5".format(fname, RAW_H5_SUFFIX) dfcname = "{}{}.h5".format(fname, CONCAT_H5_SUFFIX) if ext == ".tpx3" and os.path.exists(fpath): - file_size = os.path.getsize(fpath) - have_df = os.path.exists(dfname) - have_dfc = os.path.exists(dfcname) - - if have_df and have_dfc: - print("-> {} exists, skipping.".format(dfname)) - else: - print("-> Processing {}, size: {:.1f} MB".format(fpath, file_size/1000000)) - time_window = time_window_microsecond * 1e-6 - time_stamp_conversion = 6.1e-12 - timedif = int(time_window / time_stamp_conversion) - - if print_details: - print("Loading {} data into dataframe...".format(fpath)) - df = raw_to_sorted_df(fpath) - num_events = df.shape[0] - - if print_details: - print("Loading {} complete. {} events found. Saving to: {}".format(fpath, num_events, dfname)) - df.to_hdf(dfname, key='df', mode='w') - - if print_details: - print("Saving {} complete. Beginning clustering...".format(dfname)) - df_c = raw_df_to_cluster_df(df, timedif, radius) - num_clusters = df_c.shape[0] + + try: + file_size = os.path.getsize(fpath) + have_df = os.path.exists(dfname) + have_dfc = os.path.exists(dfcname) + + if have_df and have_dfc and not overwrite: + + print("-> {} exists, skipping.".format(dfname)) + + else: + + if print_details: + print("-> Processing {}, size: {:.1f} MB".format(fpath, file_size/1000000)) + + time_window = time_window_microsecond * 1e-6 + time_stamp_conversion = 6.1e-12 + timedif = int(time_window / time_stamp_conversion) + + if print_details: + print("Loading {} data into dataframe...".format(fpath)) + + df = tpx_to_raw_df(fpath) + num_events = df.shape[0] + + if print_details: + print("Loading {} complete. {} events found. Saving to: {}".format(fpath, num_events, dfname)) + + df.to_hdf(dfname, key="df", format="table", mode="w") + + if print_details: + print("Saving {} complete. Beginning clustering...".format(dfname)) + + df_c = raw_df_to_cent_df(df, timedif, radius) + + num_clusters = df_c.shape[0] + + if print_details: + print("Clustering {} complete. {} clusters found. Saving to {}".format(fpath, num_clusters, dfcname)) + + df_c.to_hdf(dfcname, key="df", format="table", mode="w") + + if print_details: + print("Saving {} complete. Moving onto next file.".format(dfcname)) + + except Exception as e: + if print_details: - print("Clustering {} complete. {} clusters found. Saving to {}".format(fpath, num_clusters, dfcname)) - df_c.to_hdf(dfcname, key='df', mode='w') - print("Saving {} complete. Moving onto next file.".format(dfcname)) + print("Conversion of {} failed due to {}, moving on.".format(fpath,e)) + else: - print("File not found. Moving onto next file.") - - -def convert_tpx3_parallel( - fpaths: Union[str, Path], num_workers: int = None -): + if print_details: + print("File not found. Moving onto next file.") + + +def convert_tpx3_files_parallel(fpaths: Union[List[str], List[Path]], num_workers: int = None): """ - Convert a list of .tpx3 files in a parallel processing pool. + Convert a list .tpx3 files in parallel using multiprocessing and convert_tpx_file(). + + TO DO: Accept more arguments for convert_tpx_file. Parameters ---------- - fpaths : Union[str, Path] - .tpx3 file paths to convert in a parallel processing pool. - num_workers : int = None - Number of parallel workers to employ. + fpath : Union[str, Path] + .tpx3 file path + time_window_microsecond : float = DEFAULT_CLUSTER_TW_MICROSECONDS + The time window, in microseconds, to perform centroiding + radius : int = DEFAULT_CLUSTER_RADIUS + The radius, in pixels, to perform centroiding + print_details : bool = False + Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. + """ - if num_workers == None: - num_cores = multiprocessing.cpu_count() - max_workers = num_cores-1 + if num_workers is None: + max_workers = multiprocessing.cpu_count() else: max_workers = num_workers - - with multiprocessing.Pool(processes=max_workers) as pool: - pool.map(convert_file, fpaths) - - print("Parallel conversion complete") - - -def convert_tpx3_st(fpaths: Union[str, Path]): - """ - Convert a list of .tpx3 files in a single thread. - - Parameters - ---------- - fpaths : Union[str, Path] - .tpx3 file paths to convert in a single thread. - """ - for file in fpaths: - convert_file(file) - -def get_cent_files( - uid: str, dir_name: Union[str, Path] -) -> List[str]: - """ - Gets a list of the centroided .h5 files from a given uid, sorted by sequence number. - - Parameters - ---------- - uid : str - The unique ID of the scan of which we want to get the files. + with multiprocessing.Pool(processes=max_workers) as pool: + for _ in tqdm(pool.imap_unordered(convert_tpx_file, fpaths), total=len(fpaths)): + pass - dir_name : Union[str, path] - Directory to look in for the files. - Returns - ------- - List[str] - List of the centroided file paths. - """ - cent_files = [ - os.path.join(dir_name, file) - for file in os.listdir(dir_name) - if file.endswith("{}.h5".format(CENT_H5_SUFFIX)) and str(uid) in file and len(os.path.basename(file)) == 44 - ] - - cent_files.sort(key=lambda f: int(os.path.splitext(os.path.basename(f))[0].split("_")[-2])) - return cent_files - - -def concat_cent_files( - cfpaths: List[Union[str, Path]] -): +def convert_tpx3_files(fpaths: Union[List[str], List[Path]], print_details = True): """ - Concatenates several centroided files together. - - Parameters - ---------- - cfpaths : List[str, Path] - List of the centroided .h5 files to concatenate together. - """ - dir_name = os.path.dirname(cfpaths[0]) - uid = extract_uid_from_fpaths(cfpaths) - - dfs = [pd.read_hdf(fpath, key='df') for fpath in tqdm(cfpaths)] - combined_df = pd.concat(dfs).reset_index(drop=True) - - save_path = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) - combined_df.to_hdf(save_path, key='df', mode='w') + Convert a list .tpx3 files in a single process using convert_tpx_file(). - print("-> Saving complete.") - - -def get_con_cent_file( - sid: int -) -> str: - """ - Gets the location of the concatenated centroid files of a given sid. + TO DO: Accept more arguments for convert_tpx_file. Parameters ---------- - sid : int - Short ID of whichto get the centroided file path + fpath : Union[str, Path] + .tpx3 file path + time_window_microsecond : float = DEFAULT_CLUSTER_TW_MICROSECONDS + The time window, in microseconds, to perform centroiding + radius : int = DEFAULT_CLUSTER_RADIUS + The radius, in pixels, to perform centroiding + print_details : bool = False + Boolean toggle about whether to print detailed data. + overwrite : bool = True + Boolean toggle about whether to overwrite pre-existing data. - Returns - ------- - str - Path of the centroided file. """ - fpaths = extract_fpaths_from_sid(sid) - uid = extract_uid_from_fpaths(fpaths) - dir_name = extract_dir_from_fpaths(fpaths) - cfpath = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) - - if os.path.exists(cfpath): - return cfpath - else: - print("-> Warning: {} does not exist".format(cfpath)) - return None - - -def convert_sids( - sids: List[int] -): - """ - Convert given sids by converting each .tpx3 file and then concatenating results together into a master dataframe. - - Parameters - ---------- - sids : List[int] - List of BlueSky scans' short IDs to convert. - """ - - for sid in sids: - print("\n\n---> Beginning sid: {} <---\n".format(sid)) + for file in fpaths: + convert_tpx_file(file, print_details = print_details) - tpx3fpaths = extract_fpaths_from_sid(sid) - dir_name = extract_dir_from_fpaths(tpx3fpaths) - num_tpx = len(tpx3fpaths) - uid = extract_uid_from_fpaths(tpx3fpaths) - - convert_tpx3_parallel(tpx3fpaths, num_workers=16) - centfpaths = get_cent_files(uid, dir_name) - num_cent = len(centfpaths) - - if num_tpx == num_cent: - print("---> Conversion numbers match") - concat_cent_files(centfpaths) - else: - print("---> Warning: conversion mismatch: tpx3={}, cent={}".format(num_tpx, num_cent)) - - print("---> Done with {}!".format(sid)) - gc.collect() + \ No newline at end of file diff --git a/tpx3awkward/examples/example.ipynb b/tpx3awkward/examples/example.ipynb new file mode 100644 index 0000000..6eeabb1 --- /dev/null +++ b/tpx3awkward/examples/example.ipynb @@ -0,0 +1,351 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "76010898-2082-49cb-9bff-8173fd79ad6e", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import _utils as tpx\n", + "import pandas as pd\n", + "from tiled.client import from_uri\n", + "db = from_uri('https://tiled.nsls2.bnl.gov', 'dask')['chx']['raw']" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "59bb0e40-1686-413c-8595-c38655e2d0d6", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "sid = 143200 # example SID (1800 partitions at 1 sec/partition from NSLS-II quantum microscope project)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "1b629cf7-1d46-49b9-a41b-43f3b9972f4e", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Raw dataframe: \n", + " x y ToT t chip\n", + "0 502 452 475 11301032 1\n", + "1 405 272 275 11302904 1\n", + "2 406 272 350 11302904 1\n", + "3 295 135 525 11303039 0\n", + "4 358 509 300 11303780 1\n", + "... ... ... ... ... ...\n", + "971254 200 46 550 651295354 3\n", + "971255 489 104 450 651296021 0\n", + "971256 389 21 300 651296305 0\n", + "971257 390 21 300 651296306 0\n", + "971258 37 325 75 651299518 2\n", + "\n", + "[971259 rows x 5 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n\n", + "0 11301032 502.000000 452.0 475 475 1\n", + "1 11302904 405.559998 272.0 350 625 2\n", + "2 11303039 295.000000 135.0 525 525 1\n", + "3 11303780 358.478271 509.0 300 575 2\n", + "4 11303973 412.409088 60.0 325 550 2\n", + "... ... ... ... ... ... ..\n", + "659582 651294809 326.000000 379.0 475 475 1\n", + "659583 651295354 200.000000 46.0 550 550 1\n", + "659584 651296021 489.000000 104.0 450 450 1\n", + "659585 651296305 389.500000 21.0 300 600 2\n", + "659586 651299518 37.000000 325.0 75 75 1\n", + "\n", + "[659587 rows x 6 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n x y \\\n", + "0 11301032 504.000000 454.0 475 475 1 504 454 \n", + "1 11302904 407.559998 274.0 350 625 2 408 274 \n", + "2 11303039 297.000000 135.0 525 525 1 297 135 \n", + "3 11303780 360.478271 511.0 300 575 2 360 511 \n", + "4 11303973 414.409088 60.0 325 550 2 414 60 \n", + "... ... ... ... ... ... .. ... ... \n", + "659582 651294809 328.000000 381.0 475 475 1 328 381 \n", + "659583 651295354 200.000000 46.0 550 550 1 200 46 \n", + "659584 651296021 491.000000 104.0 450 450 1 491 104 \n", + "659585 651296305 391.500000 21.0 300 600 2 392 21 \n", + "659586 651299518 37.000000 327.0 75 75 1 37 327 \n", + "\n", + " t_ns \n", + "0 1.765786e+07 \n", + "1 1.766079e+07 \n", + "2 1.766100e+07 \n", + "3 1.766216e+07 \n", + "4 1.766246e+07 \n", + "... ... \n", + "659582 1.017648e+09 \n", + "659583 1.017649e+09 \n", + "659584 1.017650e+09 \n", + "659585 1.017650e+09 \n", + "659586 1.017655e+09 \n", + "\n", + "[659587 rows x 9 columns]\n", + "Partition duration: 0.999997634375 sec.\n" + ] + } + ], + "source": [ + "files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + "\n", + "file = files[0][5:] # cut off 'file:' from beginning\n", + "\n", + "df = tpx.tpx_to_raw_df(file)\n", + "cdf = tpx.raw_df_to_cent_df(df)\n", + "\n", + "print('Raw dataframe: ')\n", + "print(df)\n", + "\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "cdf = tpx.add_centroid_cols(cdf)\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "duration = (cdf.iloc[-1]['t_ns'] - cdf.iloc[0]['t_ns'])/1e9\n", + "print(f'Partition duration: {duration} sec.')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "48254ae7-8d0c-4766-b1d1-2b7f4732934c", + "metadata": {}, + "outputs": [], + "source": [ + "sid = 143201 # new example SID (1800 partitions at 1 sec/partition from NSLS-II quantum microscope project)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "2adae433-a68c-436e-9640-d4a9745af969", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "-> Processing /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3, size: 10.5 MB\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 data into dataframe...\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 complete. 975938 events found. Saving to: /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5 complete. Beginning clustering...\n", + "Clustering /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.tpx3 complete. 659537 clusters found. Saving to /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5 complete. Moving onto next file.\n", + "-> Processing /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3, size: 10.5 MB\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 data into dataframe...\n", + "Loading /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 complete. 972867 events found. Saving to: /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.h5 complete. Beginning clustering...\n", + "Clustering /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001.tpx3 complete. 657491 clusters found. Saving to /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001_cent.h5\n", + "Saving /nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000001_cent.h5 complete. Moving onto next file.\n" + ] + } + ], + "source": [ + "files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + "files = [f.replace('file:', '') for f in files]\n", + "tpx.convert_tpx3_files(files[0:2])" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "05332e5b-17eb-4074-ad92-51ef1273e9f4", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Raw dataframe: \n", + " x y ToT t chip\n", + "0 210 132 575 11294477 3\n", + "1 106 227 375 11294540 3\n", + "2 107 227 150 11294544 3\n", + "3 381 346 475 11294882 1\n", + "4 268 444 450 11295743 1\n", + "... ... ... ... ... ...\n", + "975933 103 441 600 651291179 2\n", + "975934 193 383 450 651294115 2\n", + "975935 121 280 400 651294141 2\n", + "975936 256 373 400 651294145 1\n", + "975937 255 373 50 651294166 2\n", + "\n", + "[975938 rows x 5 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n\n", + "0 11294477 210.000000 132.0 575 575 1\n", + "1 11294540 106.285713 227.0 375 525 2\n", + "2 11294882 381.000000 346.0 475 475 1\n", + "3 11295743 268.000000 444.0 450 450 1\n", + "4 11298466 420.000000 52.0 450 450 1\n", + "... ... ... ... ... ... ..\n", + "659532 651290601 101.000000 436.0 575 575 1\n", + "659533 651291179 103.000000 441.0 600 600 1\n", + "659534 651294115 193.000000 383.0 450 450 1\n", + "659535 651294141 121.000000 280.0 400 400 1\n", + "659536 651294145 255.888885 373.0 400 450 2\n", + "\n", + "[659537 rows x 6 columns]\n", + "Centroided dataframe: \n", + " t xc yc ToT_max ToT_sum n x y \\\n", + "0 11294477 210.000000 132.0 575 575 1 210 132 \n", + "1 11294540 106.285713 227.0 375 525 2 106 227 \n", + "2 11294882 383.000000 348.0 475 475 1 383 348 \n", + "3 11295743 270.000000 446.0 450 450 1 270 446 \n", + "4 11298466 422.000000 52.0 450 450 1 422 52 \n", + "... ... ... ... ... ... .. ... ... \n", + "659532 651290601 101.000000 438.0 575 575 1 101 438 \n", + "659533 651291179 103.000000 443.0 600 600 1 103 443 \n", + "659534 651294115 193.000000 385.0 450 450 1 193 385 \n", + "659535 651294141 121.000000 282.0 400 400 1 121 282 \n", + "659536 651294145 257.888885 375.0 400 450 2 258 375 \n", + "\n", + " t_ns \n", + "0 1.764762e+07 \n", + "1 1.764772e+07 \n", + "2 1.764825e+07 \n", + "3 1.764960e+07 \n", + "4 1.765385e+07 \n", + "... ... \n", + "659532 1.017642e+09 \n", + "659533 1.017642e+09 \n", + "659534 1.017647e+09 \n", + "659535 1.017647e+09 \n", + "659536 1.017647e+09 \n", + "\n", + "[659537 rows x 9 columns]\n", + "Partition duration: 0.99999948125 sec.\n" + ] + } + ], + "source": [ + "df = pd.read_hdf('/nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000.h5')\n", + "cdf = pd.read_hdf('/nsls2/data/chx/legacy/data/2024/02/05/33008fb5-d9f5-4f46-8b5d_00000_000000_cent.h5')\n", + "\n", + "print('Raw dataframe: ')\n", + "print(df)\n", + "\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "cdf = tpx.add_centroid_cols(cdf)\n", + "print('Centroided dataframe: ')\n", + "print(cdf)\n", + "\n", + "duration = (cdf.iloc[-1]['t_ns'] - cdf.iloc[0]['t_ns'])/1e9\n", + "print(f'Partition duration: {duration} sec.')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "c981be40-e631-4f73-bbab-4fadda8b9285", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|██████████| 1800/1800 [07:51<00:00, 3.82it/s]\n" + ] + } + ], + "source": [ + "try:\n", + " files = db[sid]['primary']['data']['tpx3_files_raw_filepaths'][0].compute()\n", + " files = [f.replace('file:', '') for f in files]\n", + " tpx.convert_tpx3_files_parallel(files)\n", + " \n", + "except Exception as e:\n", + " print(f'Could not finish {sid} due to {e}') " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "946fa74f-8c8c-4f36-bc0c-9f203e023f78", + "metadata": { + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All raw .h5 files exist!\n", + "All centroided raw .h5 files exist!\n" + ] + } + ], + "source": [ + "import os\n", + "\n", + "raw_h5_files = [os.path.exists(f.replace('.tpx3', '.h5')) for f in files]\n", + "cent_h5_files = [os.path.exists(f.replace('.tpx3', '_cent.h5')) for f in files]\n", + "\n", + "if all(raw_h5_files):\n", + " print('All raw .h5 files exist!')\n", + "else:\n", + " print('Missing .h5 files!')\n", + " \n", + "if all(cent_h5_files):\n", + " print('All centroided raw .h5 files exist!')\n", + "else:\n", + " print('Missing centroided .h5 files!')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "705c639d-dac9-4517-b5ff-ce44365ed4df", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/tpx3awkward/tests/test_toa.py b/tpx3awkward/tests/test_toa.py new file mode 100644 index 0000000..cb590e8 --- /dev/null +++ b/tpx3awkward/tests/test_toa.py @@ -0,0 +1,67 @@ +from tpx3awkward._utils import get_block, matches_nibble, get_spidr, check_spidr_overflow + +import numpy as np +import pytest + + +@pytest.fixture(scope="function") +def data(n=10): + data = np.zeros(n, dtype=np.uint64) + return data + + +@pytest.fixture(scope="function") +def header_msg(chip=3): + return (np.uint8(chip) << np.uint(32)) + np.uint64(861425748) + + +@pytest.fixture(scope="function") +def empty_msg(): + return np.uint64(0xb) << np.uint(60) + + +def test_get_block(header_msg): + assert np.uint8(get_block(header_msg, 8, 32)) == 3 + + +def test_matches_nibble(): + msg = np.uint64(0xb) << np.uint(60) + assert matches_nibble(msg, 0xb) + + +def test_get_spidr(empty_msg): + msg1 = empty_msg + np.uint32(32768) + msg2 = empty_msg + np.uint32(65535) + assert get_spidr(msg1) == 32768 + assert get_spidr(msg2) == 65535 + +def test_check_spidr_overflow(empty_msg): + spidr_arr = [0, 0, 0, 1, 1, 1, 2, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10] + spidr_arr.extend(range(10, 65535, 3)) + spidr_arr.extend([65534, 65534, 65535, 65534, 65534, 65535, 65535, 65534]) + data = [empty_msg + np.uint32(s) for s in spidr_arr] + midpoint, last_spidr = check_spidr_overflow(data, 0) + assert midpoint == 0 + assert last_spidr == 65535 + + spidr_arr = list(range(20000, 65535)) + spidr_arr.extend(range(0, 10000)) + data = [empty_msg + np.uint32(s) for s in spidr_arr] + midpoint, last_spidr = check_spidr_overflow(data, 0) + assert midpoint == 14999 + assert last_spidr == 9999 + + spidr_arr = [65534, 65534, 65535, 65534, 65534, 65535, 0, 65535, 1, 65534, 0, 0, 0, 1, 1, 2, 3, 4, 5] + spidr_arr.extend(range(5, 2000)) + data = [empty_msg + np.uint32(s) for s in spidr_arr] + midpoint, last_spidr = check_spidr_overflow(data, 0) + assert midpoint == 33767 + assert last_spidr == 1999 + + spidr_arr = list(range(20000, 65535)) + spidr_arr.extend([65534, 65534, 65535, 65534, 65534, 65535, 0, 65535, 1, 65534, 0, 0, 1, 1, 2, 3, 4, 5]) + + data = [empty_msg + np.uint32(s) for s in spidr_arr] + midpoint, last_spidr = check_spidr_overflow(data, 0) + assert midpoint == 10002 + assert last_spidr == 5 \ No newline at end of file