From 2ff21608a8c4dbd58f92c26f23a324c28ad2122a Mon Sep 17 00:00:00 2001 From: Konstantinos Parasyris Date: Mon, 17 Jun 2024 16:25:45 -0700 Subject: [PATCH] Allow hdf5 database to store all data and their predicate information * Fixes minor bug in switch-case statement of resource manger in case of device executions. --- src/AMSlib/AMS.cpp | 18 ++ src/AMSlib/CMakeLists.txt | 9 +- src/AMSlib/wf/basedb.hpp | 305 ++++++------------ src/AMSlib/wf/hdf5db.cpp | 240 ++++++++++++++ src/AMSlib/wf/resource_manager.cpp | 1 + src/AMSlib/wf/resource_manager.hpp | 2 + src/AMSlib/wf/utils.cpp | 16 + src/AMSlib/wf/utils.hpp | 10 +- src/AMSlib/wf/workflow.hpp | 110 ++++--- tests/AMSlib/CMakeLists.txt | 6 + .../env_2_models_fs_rand_uq.json.in | 21 +- tests/AMSlib/verify_ete.py | 45 ++- 12 files changed, 517 insertions(+), 266 deletions(-) create mode 100644 src/AMSlib/wf/hdf5db.cpp create mode 100644 src/AMSlib/wf/utils.cpp diff --git a/src/AMSlib/AMS.cpp b/src/AMSlib/AMS.cpp index 5c5d3066..9f326d34 100644 --- a/src/AMSlib/AMS.cpp +++ b/src/AMSlib/AMS.cpp @@ -49,6 +49,7 @@ struct AMSAbstractModel { std::string SPath; std::string UQPath; std::string DBLabel; + bool DebugDB; double threshold; AMSUQPolicy uqPolicy; int nClusters; @@ -85,6 +86,15 @@ struct AMSAbstractModel { return value["db_label"].get(); } + bool parseDebugDB(nlohmann::json &value) + { + if (!value.contains("debug_db")) { + return false; + } + + return value["debug_db"].get(); + } + void parseUQPaths(AMSUQPolicy policy, nlohmann::json &jRoot) { @@ -189,6 +199,12 @@ struct AMSAbstractModel { threshold = value["threshold"].get(); parseUQPaths(uqPolicy, value); DBLabel = parseDBLabel(value); + DebugDB = parseDebugDB(value); + + CFATAL(AMS, + DebugDB && (SPath.empty()), + "To store predicates in dabase, a surrogate model field is " + "mandatory"); } @@ -199,6 +215,7 @@ struct AMSAbstractModel { double threshold, int num_clusters) { + DebugDB = false; if (db_label == nullptr) FATAL(AMS, "registering model without a database identifier\n"); @@ -550,6 +567,7 @@ ams::AMSWorkflow *_AMSCreateExecutor(AMSCAbstrModel model, model_descr.UQPath, model_descr.SPath, model_descr.DBLabel, + model_descr.DebugDB, resource_type, model_descr.threshold, model_descr.uqPolicy, diff --git a/src/AMSlib/CMakeLists.txt b/src/AMSlib/CMakeLists.txt index 35159b0c..ebcf6230 100644 --- a/src/AMSlib/CMakeLists.txt +++ b/src/AMSlib/CMakeLists.txt @@ -7,13 +7,18 @@ file(GLOB_RECURSE MINIAPP_INCLUDES "*.hpp") #set global library path to link with tests if necessary set(LIBRARY_OUTPUT_PATH ${AMS_LIB_OUT_PATH}) -set(AMS_LIB_SRC ${MINIAPP_INCLUDES} AMS.cpp wf/resource_manager.cpp wf/debug.cpp wf/basedb.cpp wf/logger.cpp) +set(AMS_LIB_SRC ${MINIAPP_INCLUDES} AMS.cpp wf/resource_manager.cpp wf/debug.cpp wf/basedb.cpp wf/logger.cpp wf/utils.cpp) if (WITH_CUDA) list(APPEND AMS_LIB_SRC wf/cuda/utilities.cpp) - message(WARNING "FILES ARE ${AMS_LIB_SRC}") endif() +if (WITH_HDF5) + list(APPEND AMS_LIB_SRC wf/hdf5db.cpp) +endif() + + + # two targets: a shared lib and an exec add_library(AMS ${AMS_LIB_SRC} ${MINIAPP_INCLUDES}) diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index d6acc086..c17e3f22 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -9,6 +9,8 @@ #define __AMS_BASE_DB__ +#include + #include #include #include @@ -22,7 +24,6 @@ #include "AMS.h" #include "debug.h" -#include "resource_manager.hpp" #include "wf/debug.h" #include "wf/resource_manager.hpp" #include "wf/utils.hpp" @@ -36,10 +37,8 @@ namespace fs = std::experimental::filesystem; #warning Redis is currently not supported/tested #endif - #ifdef __ENABLE_HDF5__ #include - #define HDF5_ERROR(Eid) \ if (Eid < 0) { \ std::cerr << "[Error] Happened in " << __FILE__ << ":" \ @@ -48,6 +47,7 @@ namespace fs = std::experimental::filesystem; } #endif + #ifdef __ENABLE_RMQ__ #include #include @@ -116,16 +116,21 @@ class BaseDB virtual void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) = 0; + std::vector& outputs, + bool* predicate = nullptr) = 0; virtual void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) = 0; + std::vector& outputs, + bool* predicate = nullptr) = 0; + uint64_t getId() const { return id; } virtual bool updateModel() { return false; } + + virtual bool storePredicate() const { return false; } }; /** @@ -273,15 +278,26 @@ class csvDB final : public FileDB virtual void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override + std::vector& outputs, + bool* predicate = nullptr) override { + CFATAL(CSV, + predicate != nullptr, + "CSV database does not support storing uq-predicates") + _store(num_elements, inputs, outputs); } virtual void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override + std::vector& outputs, + bool* predicate = nullptr) override { + + CFATAL(CSV, + predicate != nullptr, + "CSV database does not support storing uq-predicates") + _store(num_elements, inputs, outputs); } @@ -309,8 +325,8 @@ class csvDB final : public FileDB */ }; -#ifdef __ENABLE_HDF5__ +#ifdef __ENABLE_HDF5__ class hdf5DB final : public FileDB { private: @@ -331,66 +347,25 @@ class hdf5DB final : public FileDB hid_t HDType; + /** @brief the dataset descriptor of the predicates */ + hid_t pSet; + + const bool predicateStore; + /** @brief create or get existing hdf5 dataset with the provided name * storing data as Ckunked pieces. The Chunk value controls the chunking * performed by HDF5 and thus controls the write performance * @param[in] group in which we will store data under * @param[in] dName name of the data set + * @param[in] dataType dataType to be stored for this dataset * @param[in] Chunk chunk size of dataset used by HDF5. * @reval dataset HDF5 key value */ - hid_t getDataSet(hid_t group, std::string dName, const size_t Chunk = 1024L) - { - // Our datasets a.t.m are 1-D vectors - const int nDims = 1; - // We always start from 0 - hsize_t dims = 0; - hid_t dset = -1; - - int exists = H5Lexists(group, dName.c_str(), H5P_DEFAULT); - - if (exists > 0) { - dset = H5Dopen(group, dName.c_str(), H5P_DEFAULT); - HDF5_ERROR(dset); - // We are assuming symmetrical data sets a.t.m - if (totalElements == 0) { - hid_t dspace = H5Dget_space(dset); - const int ndims = H5Sget_simple_extent_ndims(dspace); - hsize_t dims[ndims]; - H5Sget_simple_extent_dims(dspace, dims, NULL); - totalElements = dims[0]; - } - return dset; - } else { - // We will extend the data-set size, so we use unlimited option - hsize_t maxDims = H5S_UNLIMITED; - hid_t fileSpace = H5Screate_simple(nDims, &dims, &maxDims); - HDF5_ERROR(fileSpace); - - hid_t pList = H5Pcreate(H5P_DATASET_CREATE); - HDF5_ERROR(pList); - - herr_t ec = H5Pset_layout(pList, H5D_CHUNKED); - HDF5_ERROR(ec); - - // cDims impacts performance considerably. - // TODO: Align this with the caching mechanism for this option to work - // out. - hsize_t cDims = Chunk; - H5Pset_chunk(pList, nDims, &cDims); - dset = H5Dcreate(group, - dName.c_str(), - HDType, - fileSpace, - H5P_DEFAULT, - pList, - H5P_DEFAULT); - HDF5_ERROR(dset); - H5Sclose(fileSpace); - H5Pclose(pList); - } - return dset; - } + hid_t getDataSet(hid_t group, + std::string dName, + hid_t dataType, + const size_t Chunk = 1024L); + /** * @brief Create the HDF5 datasets and store their descriptors in the in/out @@ -401,19 +376,7 @@ class hdf5DB final : public FileDB */ void createDataSets(size_t numElements, const size_t numIn, - const size_t numOut) - { - for (int i = 0; i < numIn; i++) { - hid_t dSet = getDataSet(HFile, std::string("input_") + std::to_string(i)); - HDIsets.push_back(dSet); - } - - for (int i = 0; i < numOut; i++) { - hid_t dSet = - getDataSet(HFile, std::string("output_") + std::to_string(i)); - HDOsets.push_back(dSet); - } - } + const size_t numOut); /** * @brief Write all the data in the vectors in the respective datasets. @@ -426,83 +389,22 @@ class hdf5DB final : public FileDB template void writeDataToDataset(std::vector& dsets, std::vector& data, - size_t numElements) - { - int index = 0; - for (auto* I : data) { - writeVecToDataset(dsets[index++], static_cast(I), numElements); - } - } + size_t numElements); /** @brief Writes a single 1-D vector to the dataset * @param[in] dSet the dataset to write the data to * @param[in] data the data we need to write * @param[in] elements the number of data elements we have + * @param[in] datatype of elements we will write */ - void writeVecToDataset(hid_t dSet, void* data, size_t elements) - { - const int nDims = 1; - hsize_t dims = elements; - hsize_t start; - hsize_t count; - hid_t memSpace = H5Screate_simple(nDims, &dims, NULL); - HDF5_ERROR(memSpace); - - dims = totalElements + elements; - H5Dset_extent(dSet, &dims); - - hid_t fileSpace = H5Dget_space(dSet); - HDF5_ERROR(fileSpace); - - // Data set starts at offset totalElements - start = totalElements; - // And we append additional elements - count = elements; - // Select hyperslab - herr_t err = H5Sselect_hyperslab( - fileSpace, H5S_SELECT_SET, &start, NULL, &count, NULL); - HDF5_ERROR(err); - - H5Dwrite(dSet, HDType, memSpace, fileSpace, H5P_DEFAULT, data); - H5Sclose(fileSpace); - } + void writeVecToDataset(hid_t dSet, void* data, size_t elements, hid_t DType); PERFFASPECT() template void _store(size_t num_elements, std::vector& inputs, - std::vector& outputs) - { - if (isDouble::default_value()) - HDType = H5T_NATIVE_DOUBLE; - else - HDType = H5T_NATIVE_FLOAT; - - - DBG(DB, - "DB of type %s stores %ld elements of input/output dimensions (%lu, " - "%lu)", - type().c_str(), - num_elements, - inputs.size(), - outputs.size()) - const size_t num_in = inputs.size(); - const size_t num_out = outputs.size(); - - if (HDIsets.empty()) { - createDataSets(num_elements, num_in, num_out); - } - - if (HDIsets.size() != num_in || HDOsets.size() != num_out) { - std::cerr << "The data dimensionality is different than the one in the " - "DB\n"; - exit(-1); - } - - writeDataToDataset(HDIsets, inputs, num_elements); - writeDataToDataset(HDOsets, outputs, num_elements); - totalElements += num_elements; - } + std::vector& outputs, + bool* predicate = nullptr); public: // Delete copy constructors. We do not want to copy the DB around @@ -516,40 +418,20 @@ class hdf5DB final : public FileDB * @param[in] rId a unique Id for each process taking part in a distributed * execution (rank-id) */ - hdf5DB(std::string path, std::string fn, uint64_t rId) - : FileDB(path, fn, ".h5", rId) - { - std::error_code ec; - bool exists = fs::exists(this->fn); - this->checkError(ec); - - if (exists) - HFile = H5Fopen(this->fn.c_str(), H5F_ACC_RDWR, H5P_DEFAULT); - else - HFile = - H5Fcreate(this->fn.c_str(), H5F_ACC_EXCL, H5P_DEFAULT, H5P_DEFAULT); - HDF5_ERROR(HFile); - totalElements = 0; - HDType = -1; - } + hdf5DB(std::string path, + std::string fn, + uint64_t rId, + bool predicate = false); /** * @brief deconstructs the class and closes the file */ - ~hdf5DB(){ - DBG(DB, "Closing File: %s %s", type().c_str(), this->fn.c_str()) - // HDF5 Automatically closes all opened fds at exit of application. - // herr_t err = H5Fclose(HFile); - // HDF5_ERROR(err); - } + ~hdf5DB(); /** * @brief Define the type of the DB */ - std::string type() override - { - return "hdf5"; - } + std::string type() override { return "hdf5"; } /** * @brief Return the DB enumerationt type (File, Redis etc) @@ -570,18 +452,9 @@ class hdf5DB final : public FileDB */ void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override - { - if (HDType == -1) { - HDType = H5T_NATIVE_FLOAT; - } else if (HDType != H5T_NATIVE_FLOAT) { - THROW(std::runtime_error, - "Database " + fn + - " initialized to work on 'float' received different " - "datatypes'"); - } - _store(num_elements, inputs, outputs); - } + std::vector& outputs, + bool* predicate = nullptr) override; + /** * @brief Takes an input and an output vector each holding 1-D vectors data, @@ -596,22 +469,18 @@ class hdf5DB final : public FileDB */ void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override - { - if (HDType == -1) { - HDType = H5T_NATIVE_DOUBLE; - } else if (HDType != H5T_NATIVE_DOUBLE) { - THROW(std::runtime_error, - "Database " + fn + - " initialized to work on 'float' received different " - "datatypes'"); - } + std::vector& outputs, + bool* predicate = nullptr) override; - _store(num_elements, inputs, outputs); - } + /** + * @brief Returns whether the DB can also store predicate information for debug + * purposes + */ + bool storePredicate() const override { return predicateStore; } }; #endif + #ifdef __ENABLE_REDIS__ template class RedisDB : public BaseDB @@ -724,9 +593,14 @@ class RedisDB : public BaseDB void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) + std::vector& outputs, + bool predicate = nullptr) override { + CFATAL(REDIS, + predicate != nullptr, + "REDIS database does not support storing uq-predicates") + const size_t num_in = inputs.size(); const size_t num_out = outputs.size(); @@ -1652,7 +1526,8 @@ class RMQPublisherHandler final : public AMQP::LibEventHandler data = msg.data(), &_messages = this->_messages]() mutable { DBG(RMQPublisherHandler, - "[rank=%d] message #%d (Addr:%p) got acknowledged successfully " + "[rank=%d] message #%d (Addr:%p) got acknowledged " + "successfully " "by " "RMQ " "server", @@ -1664,7 +1539,8 @@ class RMQPublisherHandler final : public AMQP::LibEventHandler }) .onNack([this, id = msg.id(), data = msg.data()]() mutable { WARNING(RMQPublisherHandler, - "[rank=%d] message #%d (%p) received negative acknowledged " + "[rank=%d] message #%d (%p) received negative " + "acknowledged " "by " "RMQ " "server", @@ -2398,15 +2274,25 @@ class RabbitMQDB final : public BaseDB PERFFASPECT() void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override + std::vector& outputs, + bool* predicate = nullptr) override { + CFATAL(RMQDB, + predicate != nullptr, + "RMQ database does not support storing uq-predicates") + interface.publish(appDomain, num_elements, inputs, outputs); } void store(size_t num_elements, std::vector& inputs, - std::vector& outputs) override + std::vector& outputs, + bool* predicate = nullptr) override { + CFATAL(RMQDB, + predicate != nullptr, + "RMQ database does not support storing uq-predicates") + interface.publish(appDomain, num_elements, inputs, outputs); } @@ -2544,12 +2430,18 @@ class DBManager * @param[in] domainName name of the domain model to store data for * @param[in] dbType Type of the database to create * @param[in] rId a unique Id for each process taking part in a distributed + * @param[in] isDebug Whether this db will store both ml and physics predictions with the associated predicate * execution (rank-id) */ std::shared_ptr createDB(std::string& domainName, AMSDBType dbType, - uint64_t rId = 0) + uint64_t rId = 0, + bool isDebug = false) { + CWARNING(DBManager, + (isDebug && dbType != AMSDBType::AMS_HDF5), + "Requesting debug database but %d db type does not support it", + dbType); #ifdef __ENABLE_DB__ DBG(DBManager, "Instantiating data base"); @@ -2566,7 +2458,10 @@ class DBManager return std::make_shared(fs_interface.path(), domainName, rId); #ifdef __ENABLE_HDF5__ case AMSDBType::AMS_HDF5: - return std::make_shared(fs_interface.path(), domainName, rId); + return std::make_shared(fs_interface.path(), + domainName, + rId, + isDebug); #endif #ifdef __ENABLE_RMQ__ case AMSDBType::AMS_RMQ: @@ -2588,7 +2483,9 @@ class DBManager * @param[in] rId a unique Id for each process taking part in a distributed * execution (rank-id) */ - std::shared_ptr getDB(std::string& domainName, uint64_t rId = 0) + std::shared_ptr getDB(std::string& domainName, + uint64_t rId = 0, + bool isDebug = false) { DBG(DBManager, "Requested DB Under Name: '%s' DB Configured to operate with '%s'", @@ -2601,7 +2498,7 @@ class DBManager auto db_iter = db_instances.find(std::string(domainName)); if (db_iter == db_instances.end()) { - auto db = createDB(domainName, dbType, rId); + auto db = createDB(domainName, dbType, rId, isDebug); db_instances.insert(std::make_pair(std::string(domainName), db)); DBG(DBManager, "Creating new Database writting to file: %s", @@ -2630,17 +2527,25 @@ class DBManager return db; } - void instantiate_fs_db(AMSDBType type, std::string db_path) + void instantiate_fs_db(AMSDBType type, + std::string db_path, + bool is_debug = false) { CWARNING(DBManager, isInitialized(), - "Data Base is already initialized. Reconfiguring can result into " + "Data Base is already initialized. Reconfiguring can result " + "into " "issues") CWARNING(DBManager, dbType != AMSDBType::AMS_NONE, "Setting DBManager default DB when already set") dbType = type; + + CWARNING(DBManager, + (is_debug && dbType != AMSDBType::AMS_HDF5), + "Only HDF5 supports debug") + if (dbType != AMSDBType::AMS_NONE) fs_interface.connect(db_path); } }; diff --git a/src/AMSlib/wf/hdf5db.cpp b/src/AMSlib/wf/hdf5db.cpp new file mode 100644 index 00000000..ea3daea5 --- /dev/null +++ b/src/AMSlib/wf/hdf5db.cpp @@ -0,0 +1,240 @@ +/* + * Copyright 2021-2023 Lawrence Livermore National Security, LLC and other + * AMSLib Project Developers + * + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + */ + +#include "wf/basedb.hpp" + +using namespace ams::db; + +hid_t hdf5DB::getDataSet(hid_t group, + std::string dName, + hid_t dataType, + const size_t Chunk) +{ + // Our datasets a.t.m are 1-D vectors + const int nDims = 1; + // We always start from 0 + hsize_t dims = 0; + hid_t dset = -1; + + int exists = H5Lexists(group, dName.c_str(), H5P_DEFAULT); + + if (exists > 0) { + dset = H5Dopen(group, dName.c_str(), H5P_DEFAULT); + HDF5_ERROR(dset); + // We are assuming symmetrical data sets a.t.m + if (totalElements == 0) { + hid_t dspace = H5Dget_space(dset); + const int ndims = H5Sget_simple_extent_ndims(dspace); + hsize_t dims[ndims]; + H5Sget_simple_extent_dims(dspace, dims, NULL); + totalElements = dims[0]; + } + return dset; + } else { + // We will extend the data-set size, so we use unlimited option + hsize_t maxDims = H5S_UNLIMITED; + hid_t fileSpace = H5Screate_simple(nDims, &dims, &maxDims); + HDF5_ERROR(fileSpace); + + hid_t pList = H5Pcreate(H5P_DATASET_CREATE); + HDF5_ERROR(pList); + + herr_t ec = H5Pset_layout(pList, H5D_CHUNKED); + HDF5_ERROR(ec); + + // cDims impacts performance considerably. + // TODO: Align this with the caching mechanism for this option to work + // out. + hsize_t cDims = Chunk; + H5Pset_chunk(pList, nDims, &cDims); + dset = H5Dcreate(group, + dName.c_str(), + dataType, + fileSpace, + H5P_DEFAULT, + pList, + H5P_DEFAULT); + HDF5_ERROR(dset); + H5Sclose(fileSpace); + H5Pclose(pList); + } + return dset; +} + + +void hdf5DB::createDataSets(size_t numElements, + const size_t numIn, + const size_t numOut) +{ + for (int i = 0; i < numIn; i++) { + hid_t dSet = + getDataSet(HFile, std::string("input_") + std::to_string(i), HDType); + HDIsets.push_back(dSet); + } + + for (int i = 0; i < numOut; i++) { + hid_t dSet = + getDataSet(HFile, std::string("output_") + std::to_string(i), HDType); + HDOsets.push_back(dSet); + } + + if (storePredicate()) { + pSet = getDataSet(HFile, "predicate", H5T_NATIVE_HBOOL); + } +} + +template +void hdf5DB::writeDataToDataset(std::vector& dsets, + std::vector& data, + size_t numElements) +{ + int index = 0; + for (auto* I : data) { + writeVecToDataset(dsets[index++], + static_cast(I), + numElements, + HDType); + } +} + +void hdf5DB::writeVecToDataset(hid_t dSet, + void* data, + size_t elements, + hid_t DType) +{ + const int nDims = 1; + hsize_t dims = elements; + hsize_t start; + hsize_t count; + hid_t memSpace = H5Screate_simple(nDims, &dims, NULL); + HDF5_ERROR(memSpace); + + dims = totalElements + elements; + H5Dset_extent(dSet, &dims); + + hid_t fileSpace = H5Dget_space(dSet); + HDF5_ERROR(fileSpace); + + // Data set starts at offset totalElements + start = totalElements; + // And we append additional elements + count = elements; + // Select hyperslab + herr_t err = H5Sselect_hyperslab( + fileSpace, H5S_SELECT_SET, &start, NULL, &count, NULL); + HDF5_ERROR(err); + + H5Dwrite(dSet, DType, memSpace, fileSpace, H5P_DEFAULT, data); + H5Sclose(fileSpace); +} + + +template +void hdf5DB::_store(size_t num_elements, + std::vector& inputs, + std::vector& outputs, + bool* predicate) +{ + if (isDouble::default_value()) + HDType = H5T_NATIVE_DOUBLE; + else + HDType = H5T_NATIVE_FLOAT; + + + CFATAL(HDF5DB, + storePredicate() && predicate == nullptr, + "DB Configured to store predicates, predicate is not provided") + + + DBG(DB, + "DB of type %s stores %ld elements of input/output dimensions (%lu, " + "%lu)", + type().c_str(), + num_elements, + inputs.size(), + outputs.size()) + const size_t num_in = inputs.size(); + const size_t num_out = outputs.size(); + + if (HDIsets.empty()) { + createDataSets(num_elements, num_in, num_out); + } + + CFATAL(HDF5DB, + (HDIsets.size() != num_in || HDOsets.size() != num_out), + "The data dimensionality is different than the one in the " + "DB") + + writeDataToDataset(HDIsets, inputs, num_elements); + writeDataToDataset(HDOsets, outputs, num_elements); + + if (storePredicate() && predicate != nullptr) { + writeVecToDataset(pSet, + static_cast(predicate), + num_elements, + H5T_NATIVE_HBOOL); + } + + totalElements += num_elements; +} + + +hdf5DB::hdf5DB(std::string path, std::string fn, uint64_t rId, bool predicate) + : FileDB(path, fn, predicate ? ".debug.h5" : ".h5", rId), + predicateStore(predicate) +{ + std::error_code ec; + bool exists = fs::exists(this->fn); + this->checkError(ec); + + if (exists) + HFile = H5Fopen(this->fn.c_str(), H5F_ACC_RDWR, H5P_DEFAULT); + else + HFile = H5Fcreate(this->fn.c_str(), H5F_ACC_EXCL, H5P_DEFAULT, H5P_DEFAULT); + HDF5_ERROR(HFile); + totalElements = 0; + HDType = -1; +} + + +hdf5DB::~hdf5DB() +{ + DBG(DB, "Closing File: %s %s", type().c_str(), this->fn.c_str()) + // HDF5 Automatically closes all opened fds at exit of application. + // herr_t err = H5Fclose(HFile); + // HDF5_ERROR(err); +} + +void hdf5DB::store(size_t num_elements, + std::vector& inputs, + std::vector& outputs, + bool* predicate) +{ + if (HDType == -1) { + HDType = H5T_NATIVE_FLOAT; + } + + CFATAL(HDF5DB, + HDType != H5T_NATIVE_FLOAT, + "Database %s initialized to work on 'float' received different " + "datatypes", + fn.c_str()); + + _store(num_elements, inputs, outputs, predicate); +} + + +void hdf5DB::store(size_t num_elements, + std::vector& inputs, + std::vector& outputs, + bool* predicate) +{ + if (HDType == -1) { + HDType = H5T_NATIVE_DOUBLE; + } + _store(num_elements, inputs, outputs, predicate); +} diff --git a/src/AMSlib/wf/resource_manager.cpp b/src/AMSlib/wf/resource_manager.cpp index 8b9da265..fed18d4b 100644 --- a/src/AMSlib/wf/resource_manager.cpp +++ b/src/AMSlib/wf/resource_manager.cpp @@ -92,6 +92,7 @@ void _raw_copy(void *src, FATAL(ResourceManager, "Unknown device type to copy to from DEVICE"); break; } + break; default: FATAL(ResourceManager, "Unknown device type to copy from"); } diff --git a/src/AMSlib/wf/resource_manager.hpp b/src/AMSlib/wf/resource_manager.hpp index b86fdb6b..813c8d56 100644 --- a/src/AMSlib/wf/resource_manager.hpp +++ b/src/AMSlib/wf/resource_manager.hpp @@ -50,6 +50,7 @@ struct AMSAllocator { const std::string getName() const; }; + class ResourceManager { private: @@ -197,6 +198,7 @@ class ResourceManager //! ------------------------------------------------------------------------ }; + } // namespace ams #endif diff --git a/src/AMSlib/wf/utils.cpp b/src/AMSlib/wf/utils.cpp new file mode 100644 index 00000000..0dc8f8ad --- /dev/null +++ b/src/AMSlib/wf/utils.cpp @@ -0,0 +1,16 @@ +/* + * Copyright 2021-2023 Lawrence Livermore National Security, LLC and other + * AMSLib Project Developers + * + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + */ + +#include "wf/utils.hpp" + +void random_uq_host(bool *uq_flags, int ndata, double acceptable_error) +{ + + for (int i = 0; i < ndata; i++) { + uq_flags[i] = ((double)rand() / RAND_MAX) <= acceptable_error; + } +} diff --git a/src/AMSlib/wf/utils.hpp b/src/AMSlib/wf/utils.hpp index 711a6b86..06ea0527 100644 --- a/src/AMSlib/wf/utils.hpp +++ b/src/AMSlib/wf/utils.hpp @@ -47,15 +47,9 @@ class isDouble // ----------------------------------------------------------------------------- // ----------------------------------------------------------------------------- -void random_uq_host(bool *uq_flags, int ndata, double acceptable_error) -{ - - for (int i = 0; i < ndata; i++) { - uq_flags[i] = ((double)rand() / RAND_MAX) <= acceptable_error; - } -} +void random_uq_host(bool *uq_flags, int ndata, double acceptable_error); -template +template inline bool is_real_equal(T l, T r) { return r == std::nextafter(l, r); diff --git a/src/AMSlib/wf/workflow.hpp b/src/AMSlib/wf/workflow.hpp index 9807461c..9743457e 100644 --- a/src/AMSlib/wf/workflow.hpp +++ b/src/AMSlib/wf/workflow.hpp @@ -27,6 +27,7 @@ #ifdef __ENABLE_MPI__ #include + #include "wf/redist_load.hpp" #endif @@ -101,7 +102,8 @@ class AMSWorkflow */ void Store(size_t num_elements, std::vector &inputs, - std::vector &outputs) + std::vector &outputs, + bool *predicate = nullptr) { // 1 MB of buffer size; // TODO: Fix magic number @@ -116,54 +118,54 @@ class AMSWorkflow if (!DB) return; std::vector hInputs, hOutputs; + bool *hPredicate = nullptr; - if (appDataLoc == AMSResourceType::AMS_HOST) - return DB->store(num_elements, inputs, outputs); - - // Compute number of elements that fit inside the buffer - size_t bElements = bSize / sizeof(FPTypeValue); - FPTypeValue *pPtr = - rm.allocate(bElements, AMSResourceType::AMS_PINNED); - // Total inner vector dimensions (inputs and outputs) - size_t totalDims = inputs.size() + outputs.size(); - // Compute number of elements of each outer dimension that fit in buffer - size_t elPerDim = static_cast(floor(bElements / totalDims)); - - for (int i = 0; i < inputs.size(); i++) - hInputs.push_back(&pPtr[i * elPerDim]); - - for (int i = 0; i < outputs.size(); i++) - hOutputs.push_back(&pPtr[(i + inputs.size()) * elPerDim]); - - // Iterate over all chunks - for (int i = 0; i < num_elements; i += elPerDim) { - size_t actualElems = std::min(elPerDim, num_elements - i); - // Copy input data to host - for (int k = 0; k < numIn; k++) { - rm.copy(&inputs[k][i], - AMSResourceType::AMS_DEVICE, - hInputs[k], - AMSResourceType::AMS_HOST, - actualElems); - } + if (appDataLoc == AMSResourceType::AMS_HOST) { + return DB->store(num_elements, inputs, outputs, predicate); + } - // Copy output data to host - for (int k = 0; k < numIn; k++) { - rm.copy(&outputs[k][i], - AMSResourceType::AMS_DEVICE, - hOutputs[k], - AMSResourceType::AMS_HOST, - actualElems); - } + for (int i = 0; i < inputs.size(); i++) { + FPTypeValue *pPtr = + rm.allocate(num_elements, AMSResourceType::AMS_HOST); + rm.copy(inputs[i], AMS_DEVICE, pPtr, AMS_HOST, num_elements); + hInputs.push_back(pPtr); + } - // Store to database - DB->store(actualElems, hInputs, hOutputs); + for (int i = 0; i < outputs.size(); i++) { + FPTypeValue *pPtr = + rm.allocate(num_elements, AMSResourceType::AMS_HOST); + rm.copy(outputs[i], AMS_DEVICE, pPtr, AMS_HOST, num_elements); + hOutputs.push_back(pPtr); } - rm.deallocate(pPtr, AMSResourceType::AMS_PINNED); + + if (predicate) { + hPredicate = rm.allocate(num_elements, AMSResourceType::AMS_HOST); + rm.copy(predicate, AMS_DEVICE, hPredicate, AMS_HOST, num_elements); + } + + // Store to database + DB->store(num_elements, hInputs, hOutputs, hPredicate); + rm.deallocate(hInputs, AMSResourceType::AMS_HOST); + rm.deallocate(hOutputs, AMSResourceType::AMS_HOST); + if (predicate) rm.deallocate(hPredicate, AMSResourceType::AMS_HOST); return; } + void Store(size_t num_elements, + std::vector &inputs, + std::vector &outputs, + bool *predicate = nullptr) + { + std::vector mInputs; + for (auto I : inputs) { + mInputs.push_back(const_cast(I)); + } + + Store(num_elements, mInputs, outputs, predicate); + } + + public: AMSWorkflow() : AppCall(nullptr), @@ -180,6 +182,7 @@ class AMSWorkflow std::string &uq_path, std::string &surrogate_path, std::string &domain_name, + bool isDebugDB, AMSResourceType app_data_loc, FPTypeValue threshold, const AMSUQPolicy uq_policy, @@ -201,7 +204,7 @@ class AMSWorkflow DB = nullptr; auto &dbm = ams::db::DBManager::getInstance(); - DB = dbm.getDB(domainName, rId); + DB = dbm.getDB(domainName, rId, isDebugDB); UQModel = std::make_unique>( appDataLoc, uqPolicy, uq_path, nClusters, surrogate_path, threshold); } @@ -320,14 +323,14 @@ class AMSWorkflow } // The predicate with which we will split the data on a later step - bool *p_ml_acceptable = rm.allocate(totalElements, appDataLoc); + bool *predicate = rm.allocate(totalElements, appDataLoc); // ------------------------------------------------------------- // STEP 1: call the UQ module to look at input uncertainties // to decide if making a ML inference makes sense // ------------------------------------------------------------- CALIPER(CALI_MARK_BEGIN("UQ_MODULE");) - UQModel->evaluate(totalElements, origInputs, origOutputs, p_ml_acceptable); + UQModel->evaluate(totalElements, origInputs, origOutputs, predicate); CALIPER(CALI_MARK_END("UQ_MODULE");) DBG(Workflow, "Computed Predicates") @@ -343,7 +346,6 @@ class AMSWorkflow DBG(Workflow, "Allocated input resources") - bool *predicate = p_ml_acceptable; // ----------------------------------------------------------------- // STEP 3: call physics module only where predicate = false @@ -410,10 +412,18 @@ class AMSWorkflow if (DB) { CALIPER(CALI_MARK_BEGIN("DBSTORE");) - DBG(Workflow, - "Storing data (#elements = %d) to database", - packedElements); - Store(packedElements, packedInputs, packedOutputs); + if (!DB->storePredicate()) { + DBG(Workflow, + "Storing data (#elements = %d) to database", + packedElements); + Store(packedElements, packedInputs, packedOutputs); + } else { + DBG(Workflow, + "Storing data (#elements = %d) to database including predicates", + totalElements); + Store(totalElements, origInputs, origOutputs, predicate); + } + CALIPER(CALI_MARK_END("DBSTORE");) } @@ -425,7 +435,7 @@ class AMSWorkflow for (int i = 0; i < outputDim; i++) rm.deallocate(packedOutputs[i], appDataLoc); - rm.deallocate(p_ml_acceptable, appDataLoc); + rm.deallocate(predicate, appDataLoc); DBG(Workflow, "Finished AMSExecution") CINFO(Workflow, diff --git a/tests/AMSlib/CMakeLists.txt b/tests/AMSlib/CMakeLists.txt index 5571a922..ecba5eff 100644 --- a/tests/AMSlib/CMakeLists.txt +++ b/tests/AMSlib/CMakeLists.txt @@ -36,7 +36,13 @@ endfunction() function(INTEGRATION_TEST_ENV) JSON_TESTS("csv") + if (WITH_HDF5) JSON_TESTS("hdf5") + set(JSON_FP "${CMAKE_CURRENT_BINARY_DIR}/hdf5.json") + # Tests delta-uq models with different aggregation both models store to file with debug option set to on. + add_test(NAME AMSEndToEndFromJSON::DuqMean::DuqMax::Double::DB::hdf5-debug::HOST COMMAND bash -c "AMS_OBJECTS=${JSON_FP} ${CMAKE_CURRENT_BINARY_DIR}/ams_end_to_end_env 0 8 9 \"double\" 1 1024 app_uq_mean_debug app_uq_max_debug;AMS_OBJECTS=${JSON_FP} python3 ${CMAKE_CURRENT_SOURCE_DIR}/verify_ete.py 0 8 9 \"double\" 1024 app_uq_mean_debug app_uq_max_debug") + unset(JSON_FP) + endif() endfunction() diff --git a/tests/AMSlib/json_configs/env_2_models_fs_rand_uq.json.in b/tests/AMSlib/json_configs/env_2_models_fs_rand_uq.json.in index 24c993bf..90961bdf 100644 --- a/tests/AMSlib/json_configs/env_2_models_fs_rand_uq.json.in +++ b/tests/AMSlib/json_configs/env_2_models_fs_rand_uq.json.in @@ -32,6 +32,22 @@ "threshold": 0.5, "db_label" : "duq_max" }, + "duq_max_debug": { + "uq_type": "deltaUQ", + "model_path": "@CMAKE_CURRENT_SOURCE_DIR@/uq_max_double_cpu.pt", + "uq_aggregate": "max", + "threshold": 0.5, + "db_label" : "duq_max", + "debug_db" : true + }, + "duq_mean_debug": { + "uq_type": "deltaUQ", + "model_path": "@CMAKE_CURRENT_SOURCE_DIR@/uq_mean_double_cpu.pt", + "uq_aggregate": "mean", + "threshold": 0.5, + "db_label" : "duq_mean", + "debug_db" : true + }, "random_no_db_10": { "uq_type": "random", "model_path": "@CMAKE_CURRENT_SOURCE_DIR@/linear_scripted_cpu.pt", @@ -83,6 +99,9 @@ "app_uq_mean_ndb" : "duq_mean_no_db", "app_uq_max_ndb" : "duq_max_no_db", "app_no_model" : "no_model", - "app_no_model_no_db" : "no_model_no_db" + "app_no_model_no_db" : "no_model_no_db", + "app_uq_mean_debug" : "duq_mean_debug", + "app_uq_max_debug" : "duq_max_debug" + } } diff --git a/tests/AMSlib/verify_ete.py b/tests/AMSlib/verify_ete.py index 81da2649..7f3c6c5f 100644 --- a/tests/AMSlib/verify_ete.py +++ b/tests/AMSlib/verify_ete.py @@ -17,7 +17,7 @@ def get_suffix(db_type): return "unknown" -def verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name="test"): +def verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name="test", debug_db=False): # Returns a tuple of the input/ouput data and 0/1 for correct incorrect file. # Checks whether the files also have the right number of columns if not Path(fs_path).exists(): @@ -29,6 +29,12 @@ def verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name="test return None, 0 fn = f"{name}_0.{suffix}" + if debug_db and db_type != "hdf5": + print("Debug DB is only supported on hdf5") + return None, 1 + elif debug_db: + fn = f"{name}_0.debug.{suffix}" + fp = Path(f"{fs_path}/{fn}") if name == "" and fp.exists(): @@ -56,7 +62,7 @@ def verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name="test elif db_type == "hdf5": with h5py.File(fp, "r") as fd: dsets = fd.keys() - assert len(dsets) == (num_inputs + num_outputs), "Expected equal number of inputs/outputs" + assert len(dsets) == (num_inputs + num_outputs + int(debug_db)), "Expected equal number of inputs/outputs" inputs = sum(1 for s in dsets if "input" in s) assert inputs == num_inputs, "Expected equal number of inputs" outputs = sum(1 for s in dsets if "output" in s) @@ -64,16 +70,24 @@ def verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name="test input_data = [[] for _ in range(num_inputs)] output_data = [[] for _ in range(num_outputs)] for d in dsets: + if d == "predicate": + continue loc = int(d.split("_")[1]) if len(fd[d]): if "input" in d: input_data[loc] = fd[d][:] elif "output" in d: output_data[loc] = fd[d][:] + predicate = None + if debug_db: + predicate = np.array(fd["predicate"][:]) input_data = np.array(input_data) output_data = np.array(output_data) fp.unlink() + if debug_db: + return (input_data.T, output_data.T, predicate), 0 return (input_data.T, output_data.T), 0 + else: return None, 1 @@ -91,7 +105,9 @@ def verify( db_type, fs_path, name="test", + debug_db=False, ): + print("debug db is", debug_db) # When AMS has no model path it always calls the domain solution. # As such it behaves identically with threshold 0 if model_path == None or model_path == "": @@ -102,7 +118,7 @@ def verify( threshold = 1.0 if db_type != "none": - data, correct = verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name) + data, correct = verify_data_collection(fs_path, db_type, num_inputs, num_outputs, name, debug_db) if correct: return 1 inputs = data[0] @@ -118,7 +134,25 @@ def verify( elif "data_type" == "float": assert inputs.dtype == np.float32, "Data types do not match" - if threshold == 0.0: + # When debug db is set, we store always all elements + if debug_db: + predicate = data[2] + assert ( + len(predicate) == num_elements + ), f"debug db should always contain all data but now it has {len(predicate)}" + assert ( + len(inputs) == num_elements and len(outputs) == num_elements + ), f"Num elements should be the same as experiment {len(inputs)} {num_elements}" + # Predicate points to 'true' when we use the model. The sum should be "equal" to + # the threshold multiplied by the number of elements + arg = sum(predicate) + actual_elems = int(threshold * num_elements) + assert arg == actual_elems, "Predicate does not accumulate to the expected value" + # Over here I pick the values from input/outputs that will be selected for "domain" evaluation + # This will allow the code to verify that predicates pick the "right" values + inputs = inputs[np.logical_not(predicate)] + outputs = outputs[np.logical_not(predicate)] + elif threshold == 0.0: # Threshold 0 means collect all data. Verify the sizes. assert ( len(inputs) == num_elements and len(outputs) == num_elements @@ -252,6 +286,7 @@ def from_json(argv): threshold = model["threshold"] db_label = model["db_label"] model_path = model.get("model_path", None) + is_debug = model.get("debug_db", False) res = verify( use_device, num_inputs, @@ -265,6 +300,7 @@ def from_json(argv): db_type, fs_path, db_label, + is_debug, ) if res != 0: return res @@ -276,4 +312,3 @@ def from_json(argv): if "AMS_OBJECTS" in os.environ: sys.exit(from_json(sys.argv[1:])) sys.exit(from_cli(sys.argv[1:])) - pass