Skip to content

Commit

Permalink
Merge pull request #190 from aglowacki/master
Browse files Browse the repository at this point in the history
Fix exchange format bugs
  • Loading branch information
aglowacki authored Mar 27, 2024
2 parents 0716b11 + 65dc3ac commit 25abb21
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 32 deletions.
77 changes: 45 additions & 32 deletions src/io/file/hdf5_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ void HDF5_IO::update_theta(std::string dataset_file, std::string theta_pv_str)
rerror = H5Dread(extra_names, name_type, memoryspace_id, name_space, H5P_DEFAULT, (void*)tmp_char);

std::string value(tmp_char, 255);
value.erase(std::remove(value.begin(), value.end(), ' '), value.end());
value.erase(std::remove_if(value.begin(), value.end(), ::isspace), value.end());
if (theta_pv_str == value)
{
for (int z = 0; z < 256; z++)
Expand Down Expand Up @@ -2422,7 +2422,7 @@ void HDF5_IO::_add_v9_scalers(hid_t file_id)
if (H5Dread(names_id, filetype, mem_space_1d, name_space, H5P_DEFAULT, (void*)tmp_char) > -1)
{
std::string scaler_name_str = std::string(tmp_char, 255);
scaler_name_str.erase(std::remove(scaler_name_str.begin(), scaler_name_str.end(), ' '), scaler_name_str.end());
scaler_name_str.erase(std::remove_if(scaler_name_str.begin(), scaler_name_str.end(), ::isspace), scaler_name_str.end());
int c_idx = scaler_name_str.find(':');

if (c_idx < 0 && scaler_name_str.length() > 0)
Expand Down Expand Up @@ -2603,27 +2603,26 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s
hid_t unit_single_space;

hid_t chan_type = H5Dget_type(dset_id);
hid_t scalername_type = H5Dget_type(scaler_names_id);
//hid_t scalername_type = H5Dget_type(scaler_names_id);

hid_t chan_space = H5Dget_space(dset_id);
hid_t chan_name_space = H5Dget_space(chan_names_id);
hid_t scaler_space = H5Dget_space(scaler_dset_id);

hid_t memtype = H5Tcopy(H5T_C_S1);
H5Tset_size(memtype, 255);

H5Sget_simple_extent_dims(chan_space, &chan_dims[0], nullptr);
H5Sget_simple_extent_dims(scaler_space, &scaler_dims[0], nullptr);

image_dims_single[0] = { 1 };
hid_t readwrite_single_space = H5Screate_simple(1, &image_dims_single[0], &image_dims_single[0]);

image_dims_single[0] = chan_dims[0] + scaler_dims[0];
image_dims[0] = chan_dims[0] + scaler_dims[0];
image_dims[1] = chan_dims[1];
image_dims[2] = chan_dims[2];
hid_t image_dset_id, image_space, image_single_space;


image_dims[0] = 1;
hid_t readwrite_space = H5Screate_simple(3, &image_dims[0], &image_dims[0]);

image_dims_single[0] = {1};
hid_t readwrite_single_space = H5Screate_simple(1, &image_dims_single[0], &image_dims_single[0]);

if (false == _open_h5_dataset(exchange_images, chan_type, file_id, 3, &image_dims[0], &image_dims[0], image_dset_id, image_space))
{
Expand All @@ -2644,17 +2643,23 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s

double *data = new double[chan_dims[1] * chan_dims[2]];
double*ds_ic_data = new double[chan_dims[1] * chan_dims[2]];
for (int z = 0; z < (chan_dims[1] * chan_dims[2]); z++)
{
data[z] = 0.;
ds_ic_data[z] = 0.;
}
std::string scaler_name_str;
char char_data[256]={0};
char char_ug_data[256]="ug/cm2";
int k =0;

double quant_value = 1.0;

for (std::string::size_type x=0; x<normalize_scaler.length(); ++x)
{
normalize_scaler[x] = std::tolower(normalize_scaler[x]);
}
std::transform(normalize_scaler.begin(), normalize_scaler.end(), normalize_scaler.begin(), [](unsigned char c) { return std::tolower(c); });

image_dims[0] = 1;
hid_t readwrite_space = H5Screate_simple(3, &image_dims[0], &image_dims[0]);
image_dims_single[0] = 1;
// save scalers first
for(hsize_t i=0; i < scaler_dims[0]; i++)
{
Expand All @@ -2664,29 +2669,32 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s
k++;
H5Sselect_hyperslab (image_space, H5S_SELECT_SET, offset, nullptr, image_dims, nullptr);
//read write values
hid_t status = H5Dread(scaler_dset_id, chan_type, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
hid_t status = H5Dread(scaler_dset_id, H5T_NATIVE_DOUBLE, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
if(status > -1)
{
H5Dwrite(image_dset_id, chan_type, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
status = H5Dwrite(image_dset_id, H5T_NATIVE_DOUBLE, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
if (status == -1)
{
logW << "Issue saving scaler index " << offset[0] << "\n";
}
}

//read write names
H5Sselect_hyperslab (image_single_space, H5S_SELECT_SET, offset_single, nullptr, image_dims_single, nullptr);
status = H5Dread(scaler_names_id, scalername_type, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
status = H5Dread(scaler_names_id, memtype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
if(status > -1)
{
H5Dwrite(image_names_dset_id, filetype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
H5Dwrite(image_names_dset_id, memtype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
}


scaler_name_str = std::string(char_data, 256);
scaler_name_str.erase(std::remove(scaler_name_str.begin(), scaler_name_str.end(), ' '), scaler_name_str.end());
scaler_name_str.erase(std::find(scaler_name_str.begin(), scaler_name_str.end(), '\0'), scaler_name_str.end());
scaler_name_str.erase(std::remove_if(scaler_name_str.begin(), scaler_name_str.end(), ::isspace), scaler_name_str.end());
//to lower
for (std::string::size_type x=0; x<scaler_name_str.length(); ++x)
{
scaler_name_str[x] = std::tolower(scaler_name_str[x]);
}
if(scaler_name_str == normalize_scaler)
std::transform(scaler_name_str.begin(), scaler_name_str.end(), scaler_name_str.begin(), [](unsigned char c) { return std::tolower(c); });

if(normalize_scaler.compare(scaler_name_str) == 0)
{
for(hsize_t z=0; z < (chan_dims[1] * chan_dims[2]); z++)
{
Expand All @@ -2695,10 +2703,10 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s
}

//read write units
status = H5Dread(scaler_units_id, scalername_type, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
status = H5Dread(scaler_units_id, memtype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
if(status > -1)
{
H5Dwrite(image_units_dset_id, filetype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
H5Dwrite(image_units_dset_id, memtype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
}
}

Expand All @@ -2712,24 +2720,25 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s
// read write names
H5Sselect_hyperslab (chan_name_space, H5S_SELECT_SET, offset_single, nullptr, image_dims_single, nullptr);
H5Sselect_hyperslab (image_single_space, H5S_SELECT_SET, offset_image, nullptr, image_dims_single, nullptr);
hid_t status = H5Dread(chan_names_id, scalername_type, readwrite_single_space, chan_name_space, H5P_DEFAULT, (void*)&char_data[0]);
hid_t status = H5Dread(chan_names_id, memtype, readwrite_single_space, chan_name_space, H5P_DEFAULT, (void*)&char_data[0]);
if(status > -1)
{
H5Dwrite(image_names_dset_id, filetype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
H5Dwrite(image_names_dset_id, memtype, readwrite_single_space, image_single_space, H5P_DEFAULT, (void*)&char_data[0]);
}

// get quantification for ds_ic and store in quant_value
if(ds_ic_quant_id > -1)
{
std::string chan_name_str = std::string(char_data, 256);
chan_name_str.erase(std::remove(chan_name_str.begin(), chan_name_str.end(), ' '), chan_name_str.end());
chan_name_str.erase(std::find(chan_name_str.begin(), chan_name_str.end(), '\0'), chan_name_str.end());
chan_name_str.erase(std::remove_if(chan_name_str.begin(), chan_name_str.end(), ::isspace), chan_name_str.end());
data_struct::Element_Info<double>* element = data_struct::Element_Info_Map<double>::inst()->get_element(chan_name_str);
if(element != nullptr)
{
offset_quant[1] = element->number - 1;

H5Sselect_hyperslab (quant_space, H5S_SELECT_SET, offset_quant, nullptr, count_quant, nullptr);
hid_t status = H5Dread(ds_ic_quant_id, quant_type, readwrite_single_space, quant_space, H5P_DEFAULT, (void*)&quant_value);
hid_t status = H5Dread(ds_ic_quant_id, H5T_NATIVE_DOUBLE, readwrite_single_space, quant_space, H5P_DEFAULT, (void*)&quant_value);
if(status < 0)
{
quant_value = 1.0;
Expand All @@ -2745,14 +2754,18 @@ bool HDF5_IO::_add_exchange_meta(hid_t file_id, std::string exchange_idx, std::s
H5Sselect_hyperslab (chan_space, H5S_SELECT_SET, offset, nullptr, image_dims, nullptr);
H5Sselect_hyperslab (image_space, H5S_SELECT_SET, offset_image, nullptr, image_dims, nullptr);
//read write values
status = H5Dread(dset_id, chan_type, readwrite_space, chan_space, H5P_DEFAULT, (void*)&data[0]);
status = H5Dread(dset_id, H5T_NATIVE_DOUBLE, readwrite_space, chan_space, H5P_DEFAULT, (void*)&data[0]);
if(status > -1)
{
for(hsize_t z=0; z < (chan_dims[1] * chan_dims[2]); z++)
{
data[z] = data[z] / quant_value / ds_ic_data[z];
}
H5Dwrite(image_dset_id, chan_type, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
status = H5Dwrite(image_dset_id, H5T_NATIVE_DOUBLE, readwrite_space, image_space, H5P_DEFAULT, (void*)&data[0]);
if (status == -1)
{
logW << "Issue saving data index " << offset[0] << "\n";
}
}

//read write units
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"redis_host": "hyrule",
"node_name": "invid",
"xrf_param_override":
{
"dataset_dir": "/home/beams/AGLOWACKI/mdata/2idd_luxi",
"detector_num": 0
}
}
168 changes: 168 additions & 0 deletions test/pva_xrf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/env python

import sys
import json
import time
import random
import numpy as np
import pvaccess as pva
import redis
import pyxrfmaps as px

element_csv_filename = "../reference/xrf_library.csv"
element_henke_filename = "../reference/henke.xdr"

class XRF_Stream_Source():
def __init__(self, name, param_override, CHANNEL):
self.node_name = name
self.bnode_name = bytes(name, 'utf-8')

self.dataset_directory = "/"
self.dataset_name = "Stream"

self.row = 0
self.col = 0
self.width = 50
self.height = 50

# Select fitting routine
#self.fit_rout = px.fitting.routines.nnls()
self.fit_rout = px.fitting.routines.roi()

# Use Gausian Model
self.model = px.fitting.models.GaussModel()

# Load fit parameters
self.po = param_override

self.channel = pva.Channel(CHANNEL)
# monitor
self.channel.monitor(self.monitor)

self.sprectra_streamer = px.workflow.SpectraNetStreamer("43434")
self.sprectra_streamer.set_send_counts(True)
self.sprectra_streamer.set_send_spectra(False)

def init_fit_routine(self, int_spec):
# Initialize model and fit routine with fit parameters
self.energy_range = px.get_energy_range(int_spec.size, po.fit_params)
self.model.update_fit_params_values(po.fit_params)
self.fit_rout.initialize(model, po.elements_to_fit, energy_range)

def config_change_handler(self, msg):
if msg['data'] == self.bnode_name:
print(msg)

def fit_spec(self, spec, id):
if spec is not None:
# create a stream block
sb = px.StreamBlock(-1, self.row, self.col, self.height, self.width, self.dataset_directory, self.dataset_name)
sb.init_fitting_blocks( {px.FittingRoutines.ROI: self.fit_rout}, self.po.elements_to_fit)
#sb.spectra = spec
sb.fitting_blocks[px.FittingRoutines.ROI] = px.StreamFittingBlock()
# fit the spectra
sb.fitting_blocks[px.FittingRoutines.ROI].fit_counts = self.fit_rout.fit_counts(self.model, spec, self.po.elements_to_fit)
print (self.col, id, sb.fitting_blocks[px.FittingRoutines.ROI].fit_counts)
self.sprectra_streamer.stream(sb)
self.col += 1
if self.col >= self.width:
self.col = 0
self.row += 1
if self.row >= self.height:
self.row = 0
if self.row >= self.height:
self.row = 0
# push to redis
##

def monitor(self, pv):
xdim = 0
yxim = 0
data_arr = None
print(pv['attribute'][3])
if len(pv['dimension']) == 2:
xdim = pv['dimension'][0]['size']
ydim = pv['dimension'][1]['size']
if 'shortValue' in pv['value'][0]:
data_arr = pv['value'][0]['shortValue'].reshape(xdim * ydim, 1).astype(np.float32) # TODO: redo so we get proper dims
elif 'ushortValue' in pv['value'][0]:
data_arr = pv['value'][0]['ushortValue'].reshape(xdim * ydim, 1).astype(np.float32) # TODO: redo so we get proper dims
elif 'intValue' in pv['value'][0]:
data_arr = pv['value'][0]['intValue'].reshape(xdim * ydim, 1).astype(np.float32) # TODO: redo so we get proper dims
elif 'uintValue' in pv['value'][0]:
data_arr = pv['value'][0]['uintValue'].reshape(xdim * ydim, 1).astype(np.float32) # TODO: redo so we get proper dims
elif 'floatValue' in pv['value'][0]:
data_arr = pv['value'][0]['floatValue'].reshape(xdim * ydim, 1) # TODO: redo so we get proper dims
else:
print(pv['value'][0].keys())
#print('Got image: %d' % pv['uniqueId'], xdim, ydim, data_arr[:] )
self.fit_spec(data_arr, pv['uniqueId'])
#'value', 'codec', 'compressedSize', 'uncompressedSize', 'dimension', 'uniqueId', 'dataTimeStamp', 'attribute', 'descriptor', 'alarm', 'timeStamp', 'display'

def main():
CHANNEL = None
config_dict = None
param_override = None
p = None
if len(sys.argv) < 2:
print("Please provide config file as second arg")
exit(1)

print ('Loading json file ', sys.argv[1])
with open(sys.argv[1]) as json_file:
config_dict = json.load(json_file)
print (config_dict)

if config_dict == None:
print('Error loading json file ', sys.argv[1])

if 'redis_host' in config_dict:
r = redis.Redis(config_dict['redis_host'])
# get initial config
doc = r.json().get('xrf_workers', '$')
if not config_dict['node_name'] in doc[0]:
print ('Config not found for ', config_dict['node_name'])
print (doc)
exit(1)

# subscribe to config_change for live changes
#p = r.pubsub()
#p.subscribe(**{'config_change': config_change_handler})
# connect to PVA channel
CHANNEL = str(doc[0][config_dict['node_name']]['PVA']) # 'bdpSimDetector:Pva1:Image'
print(config_dict['node_name'], CHANNEL)

# get param override from redis

if CHANNEL == None:
if 'PVA' in config_dict:
CHANNEL = config_dict['PVA']
else:
print("Please add 'PVA' to config.json or add a 'redis_host' to get config data from")
exit(1)

if param_override == None:
if "xrf_param_override" in config_dict:
config_dict["xrf_param_override"]["dataset_dir"] = config_dict["xrf_param_override"]["dataset_dir"] + '/'
param_override = px.load_override_params(config_dict["xrf_param_override"]["dataset_dir"], config_dict["xrf_param_override"]["detector_num"], True)

# load ref info
px.load_element_info(element_henke_filename, element_csv_filename)

if 'node_name' not in config_dict:
config_dict['node_name'] = 'node_' + str(random.randint(100,1000))
print('Could not find "node_name" in config.json. Using random generated name = ', config_dict['node_name'] )
source = XRF_Stream_Source(config_dict['node_name'], param_override, CHANNEL)

# loop and check config changes
for i in range(1000):
#if p is not None:
#p.get_message()
time.sleep(1)
#time.sleep(1000)


if __name__ == '__main__':
main()


0 comments on commit 25abb21

Please sign in to comment.