From 84867930a1d89f741ff6b7072cab9139a806baba Mon Sep 17 00:00:00 2001 From: Brandon Oubre Date: Fri, 2 Aug 2024 12:30:33 -0400 Subject: [PATCH] Re-Split XDF Script (#424) * Introducing a script to resplit an XDF file that can be run on the Martinos cluster. * Making neurobooth_os/terra dependencies optional for split xdf * DDL for new log split table * Implementing HDF5 corrections * Fixing JSON parsing of sensor IDs in old XDF using single quotes --- examples/hdf5_corrections.yml | 17 ++ examples/split_task_device_map.yml | 388 +++++++++++++++++++++++++ neurobooth_os/iout/hdf5_corrections.py | 153 ++++++++++ neurobooth_os/iout/resplit_xdf.py | 353 ++++++++++++++++++++++ neurobooth_os/iout/split_xdf.py | 15 +- sql/log_split.sql | 15 + 6 files changed, 934 insertions(+), 7 deletions(-) create mode 100755 examples/hdf5_corrections.yml create mode 100755 examples/split_task_device_map.yml create mode 100755 neurobooth_os/iout/hdf5_corrections.py create mode 100755 neurobooth_os/iout/resplit_xdf.py create mode 100644 sql/log_split.sql diff --git a/examples/hdf5_corrections.yml b/examples/hdf5_corrections.yml new file mode 100755 index 00000000..865291ae --- /dev/null +++ b/examples/hdf5_corrections.yml @@ -0,0 +1,17 @@ +# Specify which functions should be used to apply in-memory corrections to HDF5 data before writing. + +marker: neurobooth_os.iout.hdf5_corrections.py::correct_marker() +devices: + Eyelink_1: neurobooth_os.iout.hdf5_corrections.py::correct_eyelink() + FLIR_blackfly_1: neurobooth_os.iout.hdf5_corrections.py::correct_flir() + Intel_D455_1: neurobooth_os.iout.hdf5_corrections.py::correct_intel() + Intel_D455_2: neurobooth_os.iout.hdf5_corrections.py::correct_intel() + Intel_D455_3: neurobooth_os.iout.hdf5_corrections.py::correct_intel() + IPhone_dev_1: neurobooth_os.iout.hdf5_corrections.py::correct_iphone() + Mbient_BK_1: neurobooth_os.iout.hdf5_corrections.py::correct_mbient() + Mbient_LH_2: neurobooth_os.iout.hdf5_corrections.py::correct_mbient() + Mbient_LF_2: neurobooth_os.iout.hdf5_corrections.py::correct_mbient() + Mbient_RH_2: neurobooth_os.iout.hdf5_corrections.py::correct_mbient() + Mbient_RF_2: neurobooth_os.iout.hdf5_corrections.py::correct_mbient() + Mic_Yeti_dev_1: neurobooth_os.iout.hdf5_corrections.py::correct_yeti() + Mouse: neurobooth_os.iout.hdf5_corrections.py::correct_mouse() diff --git a/examples/split_task_device_map.yml b/examples/split_task_device_map.yml new file mode 100755 index 00000000..211db454 --- /dev/null +++ b/examples/split_task_device_map.yml @@ -0,0 +1,388 @@ +# This file defines default task-device mappings for the XDF split run on the cluster. +# These mappings are used if the actual device list cannot be located in log_device_param. + +intro_sess_obs_1: +- Intel_D455_1 + +intro_occulo_obs_1: +- Intel_D455_1 + +calibration_obs_1: +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +pursuit_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +fixation_no_target_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +gaze_holding_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +saccades_horizontal_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +saccades_vertical_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +intro_cog_obs_1: +- Intel_D455_1 + +MOT_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +DSC_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +hevelius_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mouse +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +intro_speech_obs_1: +- Intel_D455_1 + +passage_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mouse +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +ahh_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +gogogo_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +lalala_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +mememe_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +pataka_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +coord_pause_obs_1: +- Intel_D455_1 + +finger_nose_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +foot_tapping_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 + +altern_hand_mov_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 + +coord_pause_obs_2: +- Intel_D455_1 + +sit_to_stand_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 + +clapping_test_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +timing_test_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 +- Eyelink_1 +- IPhone_dev_1 + +saccades_horizontal_obs_test: +- FLIR_blackfly_1 +- Eyelink_1 +- IPhone_dev_1 + +pursuit_test_obs_1: +- FLIR_blackfly_1 +- Eyelink_1 +- IPhone_dev_1 + +pursuit_test_obs_test: +- FLIR_blackfly_1 +- Eyelink_1 +- IPhone_dev_1 + +finger_nose_demo_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Mic_Yeti_dev_1 + +mouse_demo_obs: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mouse +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +passage_demo_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mouse +- Mic_Yeti_dev_1 +- IPhone_dev_1 + + +noIMU_ahh_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 + +noIMU_gogogo_obs_1: +- Intel_D455_1 +- Intel_D455_2 +- Intel_D455_3 +- Mbient_LH_2 +- Mbient_RH_2 +- Mbient_RF_2 +- Mbient_LF_2 +- Mbient_BK_1 +- FLIR_blackfly_1 +- Eyelink_1 +- Mic_Yeti_dev_1 +- IPhone_dev_1 \ No newline at end of file diff --git a/neurobooth_os/iout/hdf5_corrections.py b/neurobooth_os/iout/hdf5_corrections.py new file mode 100755 index 00000000..795dad4c --- /dev/null +++ b/neurobooth_os/iout/hdf5_corrections.py @@ -0,0 +1,153 @@ +""" +HDF5 correction functions are implemented in this file to avoid dependency nightmares. +It is also possible to implement them in the device files, or as separate supplements to device files. +""" + +import json +from neurobooth_os.iout.split_xdf import DeviceData +from neurobooth_os.iout.stream_utils import DataVersion + + +def get_description(data): + """Helper function to retrieve the description element of the XDF structure.""" + return data['info']['desc'][0] + + +def get_data_version(data) -> DataVersion: + """Extract the data version from the device or marker data. Assume v0.0 if the key is missing.""" + try: + desc = get_description(data) + return DataVersion.from_str(desc['data_version'][0]) + # Old marker descriptions don't have the full structure or store data version + except (KeyError, AttributeError, TypeError, IndexError): + return DataVersion(0, 0) + + +def correct_marker(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.marker_data) + if data_version.major < 1: + data.marker_data['info']['desc'] = { + 'data_version': [str(data_version)], + 'column_names': [json.dumps(['Marker'])], + 'column_descriptions': [json.dumps({'Marker': 'Marker message string'})], + 'device_id': 'marker', + 'sensor_ids': json.dumps(['marker']), + } + return data + + +def correct_intel(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['FrameNum', 'FrameNum_RealSense', 'Time_RealSense', 'Time_ACQ'])] + desc['column_descriptions'] = [json.dumps({ + 'FrameNum': 'Locally-tracked frame number', + 'FrameNum_RealSense': 'Camera-tracked frame number', + 'Time_RealSense': 'Camera timestamp (ms)', + 'Time_ACQ': 'Local machine timestamp (s)', + })] + return data + + +def correct_eyelink(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps([ + 'R_GazeX', 'R_GazeY', 'R_PupilSize', + 'L_GazeX', 'L_GazeY', 'L_PupilSize', + 'Target_PositionX', 'Target_PositionY', 'Target_Distance', + 'R_PPD', 'L_PPD', + 'Time_EDF', 'Time_NUC' + ])] + desc['column_descriptions'] = [json.dumps({ + 'R_GazeX': 'Right eye: Horizontal gaze location on screen (pixels)', + 'R_GazeY': 'Right eye: Vertical gaze location on screen (pixels)', + 'R_PupilSize': 'Right eye: Pupil size (arbitrary units; see EyeLink documentation)', + 'L_GazeX': 'Left eye: Horizontal gaze location on screen (pixels)', + 'L_GazeY': 'Left eye: Vertical gaze location on screen (pixels)', + 'L_PupilSize': 'Left eye: Pupil size (arbitrary units; see EyeLink documentation)', + 'Target_PositionX': 'Horizontal location of the bullseye target (camera pixels)', + 'Target_PositionY': 'Vertical location of the bullseye target (camera pixels)', + 'Target_Distance': 'Distance to the bullseye target', + 'R_PPD': 'Right eye: Angular resolution at current gaze position (pixels per visual degree)', + 'L_PPD': 'Left eye: Angular resolution at current gaze position (pixels per visual degree)', + 'Time_EDF': 'Timestamp within the EDF file (ms)', + 'Time_NUC': 'Local timestamp of sample receipt by the NUC machine (s)', + })] + return data + + +def correct_flir(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['FrameNum', 'Time_FLIR'])] + desc['column_descriptions'] = [json.dumps({ + 'FrameNum': 'Frame number', + 'Time_FLIR': 'Camera timestamp (ns)', + })] + return data + + +def correct_iphone(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['FrameNum', 'Time_iPhone', 'Time_ACQ'])] + desc['column_descriptions'] = [json.dumps({ + 'FrameNum': 'App-tracked frame number', + 'Time_iPhone': 'App timestamp (s)', + 'Time_ACQ': 'Local machine timestamp (s)', + })] + return data + + +def correct_mbient(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['Time_Mbient', 'AccelX', 'AccelY', 'AccelZ', 'GyroX', 'GyroY', 'GyroZ'])] + desc['column_descriptions'] = [json.dumps({ + 'Time_Mbient': 'Device timestamp (ms; epoch)', + 'AccelX': 'X component of acceleration in local coordinate frame (g)', + 'AccelY': 'Y component of acceleration in local coordinate frame (g)', + 'AccelZ': 'Z component of acceleration in local coordinate frame (g)', + 'GyroX': 'Angular velocity about X axis in local coordinate frame (deg/s)', + 'GyroY': 'Angular velocity about Y axis in local coordinate frame (deg/s)', + 'GyroZ': 'Angular velocity about Z axis in local coordinate frame (deg/s)', + })] + return data + + +def correct_yeti(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['ElapsedTime', 'Amplitude (1024 samples)'])] + desc['column_descriptions'] = [json.dumps({ + 'ElapsedTime': 'Elapsed time on the local LSL clock since the last chunk of samples (ms)', + 'Amplitude (1024 samples)': 'Remaining columns represent a chunk of audio samples.', + })] + return data + + +def correct_mouse(data: DeviceData) -> DeviceData: + data_version = get_data_version(data.device_data) + if data_version.major < 1: + desc = get_description(data.device_data) + desc['data_version'] = [str(data_version)] + desc['column_names'] = [json.dumps(['PosX', 'PosY', 'MouseState'])] + desc['column_descriptions'] = [json.dumps({ + 'PosX': 'X screen coordinate of the mouse (pixels)', + 'PosY': 'y screen coordinate of the mouse (pixels)', + 'MouseState': 'Flag for the state of the mouse (0=move, 1=click, -1=release)', + })] + return data diff --git a/neurobooth_os/iout/resplit_xdf.py b/neurobooth_os/iout/resplit_xdf.py new file mode 100755 index 00000000..c853c1a7 --- /dev/null +++ b/neurobooth_os/iout/resplit_xdf.py @@ -0,0 +1,353 @@ +""" +This file splits an XDF file into constituent HDF5 files. It is meant to be called on the cluster without a full +installation of Neurobooth-OS. +""" + +import os +import re +import argparse +import datetime +import importlib +from typing import NamedTuple, List, Dict, Optional, Any, Callable, ClassVar + +import yaml +import psycopg2 as pg +from pydantic import BaseModel + +import neurobooth_os.config as cfg +import neurobooth_os.iout.split_xdf as xdf + + +class SplitException(Exception): + """For generic errors that occur when splitting an XDF file.""" + pass + + +class HDF5CorrectionSpec(BaseModel): + marker: Optional[str] = None + devices: Dict[str, str] = {} + + @staticmethod + def load(path: str) -> 'HDF5CorrectionSpec': + """ + Load the correction specification from a YAML configuration file. + :param path: The path to the YAML file. + :return: The correction specification. + """ + try: + with open(path, 'r') as stream: + return HDF5CorrectionSpec(**yaml.safe_load(stream)) + except Exception as e: + raise SplitException('Unable to load correction functions from {path}!') from e + + FUNC_STR_PATTERN: ClassVar = re.compile(r'(.*)\.py::(.*)\(\)') + + @staticmethod + def import_function(func_str: str) -> Callable: + """ + Import and return the function specified by a fully.qualified.module.py::func() string. + This code is adapted from metadator, but we avoid the import because of dependency baggage. + :param func_str: The string to parse and import. + :return: The imported function. + """ + match = re.match(HDF5CorrectionSpec.FUNC_STR_PATTERN, func_str) + if match is None: + raise SplitException(f'The function specification does not match the expected pattern: {func_str}') + module, func = match.groups() + + try: + module = importlib.import_module(module) + return getattr(module, func) + except Exception as e: + raise SplitException(f'Unable to import {func_str}') from e + + def correct_device(self, device: xdf.DeviceData) -> xdf.DeviceData: + """ + Apply in-memory corrections to device data if corrections were specified for the given device/marker. + :param device: The device structure loaded from the XDF file. + :return: The corrected device structure. + """ + if self.marker is not None: + func = HDF5CorrectionSpec.import_function(self.marker) + device = func(device) + + device_id = device.device_id + if device_id in self.devices: + func = HDF5CorrectionSpec.import_function(self.devices[device_id]) + device = func(device) + + return device + + +XDF_NAME_PATTERN = re.compile(r'(\d+)_(\d\d\d\d-\d\d-\d\d)_\d\dh-\d\dm-\d\ds_(.*)_R001\.xdf', flags=re.IGNORECASE) + + +class XDFInfo(NamedTuple): + """Structured representation of an XDF file name.""" + parent_dir: str + name: str + subject_id: str + date: datetime.date + task_id: str + + @property + def path(self) -> str: + return os.path.join(self.parent_dir, self.name) + + @staticmethod + def parse_xdf_name(xdf_path: str) -> 'XDFInfo': + """ + Attempt to infer the subject ID, date, and task ID from the XDF file path. + :param xdf_path: The path to the XDF file. + :return: A structured representation of the XDF file name. + """ + parent_dir, filename = os.path.split(xdf_path) + match = re.match(XDF_NAME_PATTERN, filename) + if match is None: + raise SplitException(f'Unable to parse file name: {filename}') + + subject_id, date_str, task_id = match.groups() + return XDFInfo( + parent_dir=parent_dir, + name=filename, + subject_id=subject_id, + date=datetime.date.fromisoformat(date_str), + task_id=task_id, + ) + + +class DatabaseConnection: + """Handles limited interactions with the Neurobooth database""" + + def __init__(self, config_path: str, tunnel: bool): + """ + Create a new DatabaseConnection based on the provided Neurobooth-OS configuration file. + :param config_path: The path to the Neurobooth-OS configuration, including a 'database' entry. + :param tunnel: Whether to SSH tunnel prior to connecting. Should be False if running on neurodoor. + """ + self.connection = DatabaseConnection.connect(config_path, tunnel) + + @staticmethod + def connect(config_path: str, tunnel: bool) -> pg.extensions.connection: + """ + Load and parse a Neurobooth-OS configuration, then create a psycopg2 connection. + Note: This function copies some code from metadator.py, but importing that file introduces extra dependencies. + + :param config_path: The path to the Neurobooth-OS configuration, including a 'database' entry. + :param tunnel: Whether to SSH tunnel prior to connecting. Should be False if running on neurodoor. + """ + cfg.load_config(config_path, validate_paths=False) + database_info = cfg.neurobooth_config.database + + if tunnel: + from sshtunnel import SSHTunnelForwarder + tunnel = SSHTunnelForwarder( + database_info.remote_host, + ssh_username=database_info.remote_user, + ssh_config_file="~/.ssh/config", + ssh_pkey="~/.ssh/id_rsa", + remote_bind_address=(database_info.host, database_info.port), + local_bind_address=("localhost", 6543), + ) + tunnel.start() + host = tunnel.local_bind_host + port = tunnel.local_bind_port + else: + host = database_info.host + port = database_info.port + + return pg.connect( + database=database_info.dbname, + user=database_info.user, + password=database_info.password, + host=host, + port=port, + ) + + DEVICE_ID_QUERY = """ + WITH device AS ( + -- This subquery defines a temporary table of log device IDs + -- associated with the specified task and session. + SELECT UNNEST(tparam.log_device_ids) AS log_device_id -- Flatten the list + -- We need to do a chain of joins to get from the session -> task -> task paramaters + FROM log_session sess + JOIN log_task task + ON sess.log_session_id = task.log_session_id + JOIN log_task_param tparam + ON task.log_task_id = tparam.log_task_id + -- Filter to just the session and task of interest. + -- We use a parameterized query and pass in the filters to psycopg2. + WHERE sess.subject_id = %(subject_id)s + AND sess.date = %(session_date)s + AND task.task_id = %(task_id)s + ) + -- Now we can look up which devices were actually present during the task recording. + SELECT dparam.device_id + FROM device + JOIN log_device_param dparam + ON device.log_device_id = dparam.id + """ + + def get_device_ids(self, xdf_info: XDFInfo) -> List[str]: + """ + Retrieve the list of device IDs associated with a given task and session. + :param xdf_info: An XDF info structure, which details the task and session. + :return: The list of device IDs retrieved from the log_* tables in the database. + """ + query_params = { + 'subject_id': xdf_info.subject_id, + 'session_date': xdf_info.date.isoformat(), + 'task_id': xdf_info.task_id, + } + with self.connection.cursor() as cursor: + cursor.execute(DatabaseConnection.DEVICE_ID_QUERY, query_params) + return [row[0] for row in cursor.fetchall()] + + def log_split(self, xdf_info: XDFInfo, device_data: List[xdf.DeviceData]) -> None: + """ + Create entries in the log_split table to reflect created HDF5 files. + :param xdf_info: An XDF info structure, which details the task and session. + :param device_data: Structures representing the XDF data for each device. + """ + with self.connection.cursor() as cursor: + for device in device_data: + # The file path should be session/file.hdf5 to permit comparison to log_sensor_file + hdf5_folder, hdf5_file = os.path.split(device.hdf5_path) + _, session_folder = os.path.split(hdf5_folder) + hdf5_file = f'{session_folder}/{hdf5_file}' + + for sensor_id in device.sensor_ids: + query_params = { + 'subject_id': xdf_info.subject_id, + 'date': xdf_info.date.isoformat(), + 'task_id': xdf_info.task_id, + 'device_id': device.device_id, + 'sensor_id': sensor_id, + 'hdf5_file_path': hdf5_file, + } + cursor.execute( + """ + INSERT INTO log_split (subject_id, date, task_id, device_id, sensor_id, hdf5_file_path) + VALUES (%(subject_id)s, %(date)s, %(task_id)s, %(device_id)s, %(sensor_id)s, %(hdf5_file_path)s) + """, + query_params + ) + self.connection.commit() + + +def device_id_from_yaml(file: str, task_id: str) -> List[str]: + """ + Load a YAML file defining preset task ID -> device ID mappings and look up the given task. + :param file: The YAML file containing the mappings. + :param task_id: The task ID to look up. + :return: The preset device IDs associated with the task ID. + """ + try: + with open(file, 'r') as stream: + task_device_map = yaml.safe_load(stream) + return task_device_map[task_id] + except Exception as e: + raise SplitException(f'Could not locate task {task_id} using map file {file}.') from e + + +def split( + xdf_path: str, + database_conn: DatabaseConnection, + task_map_file: Optional[str] = None, + corrections: Optional[HDF5CorrectionSpec] = None, +) -> None: + """ + Split a single XDF file into device-specific HDF5 files. + Intended to be called either via the command line (via parse_arguments()) or by another script (e.g., one that + finds and iterates over all files to be split). + + :param xdf_path: The path to the XDF file to split. + :param database_conn: A connection interface to the Neurobooth database. + :param task_map_file: (Optional) A YAML file containing a preset mapping of task ID -> device IDs. + :param corrections: (Optional) Apply device-specific in-memory corrections before writing data to HDF5 files. + """ + xdf_info = XDFInfo.parse_xdf_name(xdf_path) + + # Look up device IDs for the given task and session + if task_map_file is not None: + device_ids = device_id_from_yaml(task_map_file, xdf_info.task_id) + else: + device_ids = database_conn.get_device_ids(xdf_info) + + if not device_ids: # Check that we found at least one device ID + raise SplitException('Could not locate task ID {} for session {}_{}.'.format( + xdf_info.task_id, xdf_info.subject_id, xdf_info.date.isoformat() + )) + + # Parse the XDF, apply corrections, write the resulting HDF5, and add an entry to log_split in the database. + device_data = xdf.parse_xdf(xdf_path, device_ids) + if corrections is not None: + device_data = [corrections.correct_device(dev) for dev in device_data] + xdf.write_device_hdf5(device_data) + database_conn.log_split(xdf_info, device_data) + + +def parse_arguments() -> Dict[str, Any]: + """ + Parse command line arguments. + :return: Dictionary of keyword arguments to split(). + """ + parser = argparse.ArgumentParser(description='Split an XDF file into device-specific HDF5 files.') + parser.add_argument( + '--xdf', + required=True, + type=str, + help="Path to the XDF file to split." + ) + parser.add_argument( + '--config-path', + default=None, + type=str, + help="Specify a path to a Neurobooth configuration file with a 'database' entry." + ) + parser.add_argument( + '--ssh-tunnel', + action='store_true', + help=( + "Specify this flag to SSH tunnel before connecting to the database. " + "This is flag is not needed if running on the same machine as the database." + ) + ) + parser.add_argument( + '--task-device-map', + type=str, + default=None, + help="If provided, the specified YAML file will be used to define a preset map of task ID -> device IDs." + ) + parser.add_argument( + '--hdf5-corrections', + type=str, + default=None, + help="If provided, the specified YAML file will be used to locate correction functions for each device ID." + ) + + def abspath(path: Optional[str]) -> Optional[str]: + return os.path.abspath(path) if path is not None else path + + args = parser.parse_args() + task_map_file = abspath(args.task_device_map) + database_conn = DatabaseConnection(abspath(args.config_path), args.ssh_tunnel) + corrections = abspath(args.hdf5_corrections) + if corrections is not None: + corrections = HDF5CorrectionSpec.load(corrections) + + return { + 'xdf_path': os.path.abspath(args.xdf), + 'database_conn': database_conn, + 'task_map_file': task_map_file, + 'corrections': corrections, + } + + +def main() -> None: + """Entry point for command-line calls.""" + split(**parse_arguments()) + + +if __name__ == '__main__': + main() diff --git a/neurobooth_os/iout/split_xdf.py b/neurobooth_os/iout/split_xdf.py index 0a466821..377dd035 100644 --- a/neurobooth_os/iout/split_xdf.py +++ b/neurobooth_os/iout/split_xdf.py @@ -11,9 +11,6 @@ import h5io import json -from neurobooth_os.iout import metadator as meta -from neurobooth_terra import Table - def compute_clocks_diff() -> float: """ @@ -38,6 +35,8 @@ def split_sens_files( :param conn: Connection to the database. :returns: The list of HDF5 files generated by the split. """ + # We import this here so that it is not a dependency for the external split_xdf script. + from neurobooth_os.iout import metadator as meta t0 = time.time() device_data = parse_xdf(xdf_path, meta.get_device_ids(task_id)) @@ -64,7 +63,6 @@ def parse_xdf(xdf_path: str, device_ids: Optional[List[str]] = None) -> List[Dev :param device_ids: If provided, only parse files corresponding to the specified devices. :returns: A structured representation of information extracted from the XDF file for each device. """ - folder, file_name = os.path.split(xdf_path) data, _ = pyxdf.load_xdf(xdf_path, dejitter_timestamps=False) # Find marker stream to associate with each device @@ -94,7 +92,8 @@ def parse_xdf(xdf_path: str, device_ids: Optional[List[str]] = None) -> List[Dev continue device_id = device_data["info"]["desc"][0]["device_id"][0] - sensors_ids = json.loads(device_data["info"]["desc"][0]["sensor_ids"][0]) # Deserialize into list + sensor_id_str = device_data["info"]["desc"][0]["sensor_ids"][0] + sensor_ids = json.loads(sensor_id_str.replace("'", '"')) # Deserialize into list if (device_ids is not None) and (device_id not in device_ids): # Only split specified devices continue @@ -104,8 +103,8 @@ def parse_xdf(xdf_path: str, device_ids: Optional[List[str]] = None) -> List[Dev device_data=device_data, marker_data=marker, video_files=video_files[device_name] if device_name in video_files else [], - sensor_ids=sensors_ids, - hdf5_path=_make_hdf5_path(xdf_path, device_id, sensors_ids), + sensor_ids=sensor_ids, + hdf5_path=_make_hdf5_path(xdf_path, device_id, sensor_ids), )) return results @@ -158,6 +157,8 @@ def log_to_database( :param conn: A database connection object. :param log_task_id: The value to insert into the log_task_id column. """ + # We import this here so that it is not a dependency for the external split_xdf script. + from neurobooth_terra import Table table_sens_log = Table("log_sensor_file", conn=conn) for dev in device_data: diff --git a/sql/log_split.sql b/sql/log_split.sql new file mode 100644 index 00000000..0469d6c9 --- /dev/null +++ b/sql/log_split.sql @@ -0,0 +1,15 @@ +CREATE TABLE log_split ( + id integer primary key generated always as identity, + created_at timestamp with time zone DEFAULT now() NOT NULL, + subject_id character varying(255) NOT NULL, + date date NOT NULL, + task_id character varying(255) NOT NULL, + device_id character varying(255) NOT NULL, + sensor_id character varying(255) NOT NULL, + hdf5_file_path text NOT NULL +); + + +ALTER TABLE log_split OWNER TO neuroboother; +GRANT SELECT ON TABLE log_split TO neurovisualizer; +