Skip to content

Commit

Permalink
Committing clang-format changes
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] committed Sep 18, 2024
1 parent 7bd3cf9 commit 2c8fcb2
Show file tree
Hide file tree
Showing 115 changed files with 4,924 additions and 5,250 deletions.
1 change: 0 additions & 1 deletion benchmark/test_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
* have access to the file, you may request a copy from help@hdfgroup.org. *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */


#ifndef HRUN_TEST_UNIT_IPC_TEST_INIT_H_
#define HRUN_TEST_UNIT_IPC_TEST_INIT_H_

Expand Down
17 changes: 2 additions & 15 deletions hermes_adapters/adapter_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,10 @@
namespace hermes::adapter {

/** Adapter types */
enum class AdapterType {
kNone,
kPosix,
kStdio,
kMpiio,
kPubsub,
kVfd
};
enum class AdapterType { kNone, kPosix, kStdio, kMpiio, kPubsub, kVfd };

/** Adapter modes */
enum class AdapterMode {
kNone,
kDefault,
kBypass,
kScratch,
kWorkflow
};
enum class AdapterMode { kNone, kDefault, kBypass, kScratch, kWorkflow };

/**
* Per-Object Adapter Settings.
Expand Down
136 changes: 73 additions & 63 deletions hermes_adapters/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@

#include <ftw.h>
#include <mpi.h>

#include <filesystem>
#include <future>
#include <set>
#include <string>

#include "data_stager/factory/binary_stager.h"
#include "filesystem_io_client.h"
#include "filesystem_mdm.h"
#include "hermes/bucket.h"
#include "hermes/hermes.h"

#include "filesystem_mdm.h"
#include "filesystem_io_client.h"
#include "hermes_adapters/mapper/mapper_factory.h"
#include "data_stager/factory/binary_stager.h"
#include <filesystem>


namespace hermes::adapter {

Expand All @@ -53,8 +52,7 @@ class Filesystem : public FilesystemIoClient {

public:
/** Constructor */
explicit Filesystem(AdapterType type)
: type_(type) {}
explicit Filesystem(AdapterType type) : type_(type) {}

/** open \a path */
File Open(AdapterStat &stat, const std::string &path) {
Expand Down Expand Up @@ -97,18 +95,23 @@ class Filesystem : public FilesystemIoClient {
// Update page size
stat.page_size_ = mdm->GetAdapterPageSize(path);
// Bucket parameters
ctx.bkt_params_ = hermes::data_stager::BinaryFileStager::BuildFileParams(stat.page_size_);
ctx.bkt_params_ =
hermes::data_stager::BinaryFileStager::BuildFileParams(
stat.page_size_);
// Get or create the bucket
if (stat.hflags_.Any(HERMES_FS_TRUNC)) {
// The file was opened with TRUNCATION
stat.bkt_id_ = HERMES->GetBucket(stat.path_, ctx, 0, HERMES_SHOULD_STAGE);
stat.bkt_id_ =
HERMES->GetBucket(stat.path_, ctx, 0, HERMES_SHOULD_STAGE);
stat.bkt_id_.Clear();
} else {
// The file was opened regularly
stat.file_size_ = GetBackendSize(*path_shm);
stat.bkt_id_ = HERMES->GetBucket(stat.path_, ctx, stat.file_size_, HERMES_SHOULD_STAGE);
stat.bkt_id_ = HERMES->GetBucket(stat.path_, ctx, stat.file_size_,
HERMES_SHOULD_STAGE);
}
HILOG(kDebug, "BKT vs file size: {} {}", stat.bkt_id_.GetSize(), stat.file_size_);
HILOG(kDebug, "BKT vs file size: {} {}", stat.bkt_id_.GetSize(),
stat.file_size_);
// Update file position pointer
if (stat.hflags_.Any(HERMES_FS_APPEND)) {
stat.st_ptr_ = std::numeric_limits<size_t>::max();
Expand All @@ -117,7 +120,7 @@ class Filesystem : public FilesystemIoClient {
}
// Allocate internal hermes data
auto stat_ptr = std::make_shared<AdapterStat>(stat);
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void *) stat_ptr.get());
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void *)stat_ptr.get());
HermesOpen(f, stat, fs_ctx);
mdm->Create(f, stat_ptr);
} else {
Expand All @@ -134,23 +137,24 @@ class Filesystem : public FilesystemIoClient {
size_t Write(File &f, AdapterStat &stat, const void *ptr, size_t off,
size_t total_size, IoStatus &io_status,
FsIoOptions opts = FsIoOptions()) {
(void) f;
(void)f;
hapi::Bucket &bkt = stat.bkt_id_;
std::string filename = bkt.GetName();
bool is_append = stat.st_ptr_ == std::numeric_limits<size_t>::max();

HILOG(kDebug, "Write called for filename: {}"
" on offset: {}"
" from position: {}"
" and size: {}"
" and adapter mode: {}",
HILOG(kDebug,
"Write called for filename: {}"
" on offset: {}"
" from position: {}"
" and size: {}"
" and adapter mode: {}",
filename, off, stat.st_ptr_, total_size,
AdapterModeConv::str(stat.adapter_mode_))
if (stat.adapter_mode_ == AdapterMode::kBypass) {
// Bypass mode is handled differently
opts.backend_size_ = total_size;
opts.backend_off_ = off;
Blob blob_wrap((char*)ptr, total_size);
Blob blob_wrap((char *)ptr, total_size);
WriteBlob(bkt.GetName(), blob_wrap, opts, io_status);
if (!io_status.success_) {
HILOG(kDebug, "Failed to write blob of size {} to backend",
Expand All @@ -167,7 +171,7 @@ class Filesystem : public FilesystemIoClient {

if (is_append) {
// Perform append
const Blob page((const char*)ptr, total_size);
const Blob page((const char *)ptr, total_size);
bkt.Append(page, stat.page_size_, ctx);
} else {
// Fragment I/O request into pages
Expand All @@ -178,7 +182,7 @@ class Filesystem : public FilesystemIoClient {

// Perform a PartialPut for each page
for (const BlobPlacement &p : mapping) {
const Blob page((const char*)ptr + data_offset, p.blob_size_);
const Blob page((const char *)ptr + data_offset, p.blob_size_);
std::string blob_name(p.CreateBlobName().str());
bkt.AsyncPartialPut(blob_name, page, p.blob_off_, ctx);
data_offset += p.blob_size_;
Expand All @@ -191,24 +195,25 @@ class Filesystem : public FilesystemIoClient {
io_status.size_ = total_size;
UpdateIoStatus(opts, io_status);

HILOG(kDebug, "The size of file after write: {}",
GetSize(f, stat))
HILOG(kDebug, "The size of file after write: {}", GetSize(f, stat))
return total_size;
}

/** base read function */
template<bool ASYNC>
size_t BaseRead(File &f, AdapterStat &stat, void *ptr, size_t off,
size_t total_size, size_t req_id,
std::vector<LPointer<hrunpq::TypedPushTask<GetBlobTask>>> &tasks,
IoStatus &io_status, FsIoOptions opts = FsIoOptions()) {
(void) f;
template <bool ASYNC>
size_t BaseRead(
File &f, AdapterStat &stat, void *ptr, size_t off, size_t total_size,
size_t req_id,
std::vector<LPointer<hrunpq::TypedPushTask<GetBlobTask>>> &tasks,
IoStatus &io_status, FsIoOptions opts = FsIoOptions()) {
(void)f;
hapi::Bucket &bkt = stat.bkt_id_;

HILOG(kDebug, "Read called for filename: {}"
" on offset: {}"
" from position: {}"
" and size: {}",
HILOG(kDebug,
"Read called for filename: {}"
" on offset: {}"
" from position: {}"
" and size: {}",
stat.path_, off, stat.st_ptr_, total_size)

// SEEK_END is not a valid read position
Expand All @@ -230,7 +235,7 @@ class Filesystem : public FilesystemIoClient {
// Bypass mode is handled differently
opts.backend_size_ = total_size;
opts.backend_off_ = off;
Blob blob_wrap((char *) ptr, total_size);
Blob blob_wrap((char *)ptr, total_size);
ReadBlob(bkt.GetName(), blob_wrap, opts, io_status);
if (!io_status.success_) {
HILOG(kDebug, "Failed to read blob of size {} from backend",
Expand All @@ -254,7 +259,7 @@ class Filesystem : public FilesystemIoClient {
Context ctx;
ctx.flags_.SetBits(HERMES_SHOULD_STAGE);
for (const BlobPlacement &p : mapping) {
Blob page((const char*)ptr + data_offset, p.blob_size_);
Blob page((const char *)ptr + data_offset, p.blob_size_);
std::string blob_name(p.CreateBlobName().str());
if constexpr (ASYNC) {
LPointer<hrunpq::TypedPushTask<GetBlobTask>> task =
Expand All @@ -278,15 +283,16 @@ class Filesystem : public FilesystemIoClient {
}

/** read */
size_t Read(File &f, AdapterStat &stat, void *ptr,
size_t off, size_t total_size,
IoStatus &io_status, FsIoOptions opts = FsIoOptions()) {
size_t Read(File &f, AdapterStat &stat, void *ptr, size_t off,
size_t total_size, IoStatus &io_status,
FsIoOptions opts = FsIoOptions()) {
std::vector<LPointer<hrunpq::TypedPushTask<GetBlobTask>>> tasks;
return BaseRead<false>(f, stat, ptr, off, total_size, 0, tasks, io_status, opts);
return BaseRead<false>(f, stat, ptr, off, total_size, 0, tasks, io_status,
opts);
}

/** write asynchronously */
FsAsyncTask* AWrite(File &f, AdapterStat &stat, const void *ptr, size_t off,
FsAsyncTask *AWrite(File &f, AdapterStat &stat, const void *ptr, size_t off,
size_t total_size, size_t req_id, IoStatus &io_status,
FsIoOptions opts = FsIoOptions()) {
// Writes are completely async at this time
Expand All @@ -298,27 +304,30 @@ class Filesystem : public FilesystemIoClient {
}

/** read asynchronously */
FsAsyncTask* ARead(File &f, AdapterStat &stat, void *ptr, size_t off,
size_t total_size, size_t req_id,
IoStatus &io_status, FsIoOptions opts = FsIoOptions()) {
FsAsyncTask *ARead(File &f, AdapterStat &stat, void *ptr, size_t off,
size_t total_size, size_t req_id, IoStatus &io_status,
FsIoOptions opts = FsIoOptions()) {
FsAsyncTask *fstask = new FsAsyncTask();
BaseRead<true>(f, stat, ptr, off, total_size, req_id, fstask->get_tasks_, io_status, opts);
BaseRead<true>(f, stat, ptr, off, total_size, req_id, fstask->get_tasks_,
io_status, opts);
fstask->io_status_ = io_status;
fstask->opts_ = opts;
return fstask;
}

/** wait for \a req_id request ID */
size_t Wait(FsAsyncTask *fstask) {
for (LPointer<hrunpq::TypedPushTask<PutBlobTask>> &push_task : fstask->put_tasks_) {
for (LPointer<hrunpq::TypedPushTask<PutBlobTask>> &push_task :
fstask->put_tasks_) {
push_task->Wait();
HRUN_CLIENT->DelTask(push_task);
}

// Update I/O status for gets
if (!fstask->get_tasks_.empty()) {
size_t get_size = 0;
for (LPointer<hrunpq::TypedPushTask<GetBlobTask>> &push_task : fstask->get_tasks_) {
for (LPointer<hrunpq::TypedPushTask<GetBlobTask>> &push_task :
fstask->get_tasks_) {
push_task->Wait();
GetBlobTask *task = push_task->get();
get_size += task->data_size_;
Expand All @@ -331,7 +340,7 @@ class Filesystem : public FilesystemIoClient {
}

/** wait for request IDs in \a req_id vector */
void Wait(std::vector<FsAsyncTask*> &req_ids, std::vector<size_t> &ret) {
void Wait(std::vector<FsAsyncTask *> &req_ids, std::vector<size_t> &ret) {
for (auto &req_id : req_ids) {
ret.emplace_back(Wait(req_id));
}
Expand Down Expand Up @@ -376,7 +385,7 @@ class Filesystem : public FilesystemIoClient {

/** file size */
size_t GetSize(File &f, AdapterStat &stat) {
(void) stat;
(void)stat;
if (stat.adapter_mode_ != AdapterMode::kBypass) {
return stat.bkt_id_.GetSize();
} else {
Expand All @@ -386,7 +395,7 @@ class Filesystem : public FilesystemIoClient {

/** tell */
size_t Tell(File &f, AdapterStat &stat) {
(void) f;
(void)f;
if (stat.st_ptr_ != std::numeric_limits<size_t>::max()) {
return stat.st_ptr_;
} else {
Expand Down Expand Up @@ -415,7 +424,7 @@ class Filesystem : public FilesystemIoClient {
int Close(File &f, AdapterStat &stat) {
Sync(f, stat);
auto mdm = HERMES_FS_METADATA_MANAGER;
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void*)&stat);
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void *)&stat);
HermesClose(f, stat, fs_ctx);
RealClose(f, stat);
mdm->Delete(stat.path_, f);
Expand Down Expand Up @@ -448,8 +457,10 @@ class Filesystem : public FilesystemIoClient {
std::list<File> files = *filesp;
for (File &f : files) {
std::shared_ptr<AdapterStat> stat = mdm->Find(f);
if (stat == nullptr) { continue; }
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void *) &stat);
if (stat == nullptr) {
continue;
}
FilesystemIoClientState fs_ctx(&mdm->fs_mdm_, (void *)&stat);
HermesClose(f, *stat, fs_ctx);
RealClose(f, *stat);
mdm->Delete(stat->path_, f);
Expand Down Expand Up @@ -485,17 +496,16 @@ class Filesystem : public FilesystemIoClient {
}

/** write asynchronously */
FsAsyncTask* AWrite(File &f, AdapterStat &stat, const void *ptr,
FsAsyncTask *AWrite(File &f, AdapterStat &stat, const void *ptr,
size_t total_size, size_t req_id, IoStatus &io_status,
FsIoOptions opts) {
size_t off = stat.st_ptr_;
return AWrite(f, stat, ptr, off, total_size, req_id, io_status, opts);
}

/** read asynchronously */
FsAsyncTask* ARead(File &f, AdapterStat &stat, void *ptr, size_t total_size,
size_t req_id,
IoStatus &io_status, FsIoOptions opts) {
FsAsyncTask *ARead(File &f, AdapterStat &stat, void *ptr, size_t total_size,
size_t req_id, IoStatus &io_status, FsIoOptions opts) {
size_t off = stat.st_ptr_;
return ARead(f, stat, ptr, off, total_size, req_id, io_status, opts);
}
Expand Down Expand Up @@ -563,10 +573,10 @@ class Filesystem : public FilesystemIoClient {
}

/** write asynchronously */
FsAsyncTask* AWrite(File &f, bool &stat_exists, const void *ptr,
FsAsyncTask *AWrite(File &f, bool &stat_exists, const void *ptr,
size_t total_size, size_t req_id,
std::vector<PutBlobTask*> &tasks,
IoStatus &io_status, FsIoOptions opts) {
std::vector<PutBlobTask *> &tasks, IoStatus &io_status,
FsIoOptions opts) {
auto mdm = HERMES_FS_METADATA_MANAGER;
auto stat = mdm->Find(f);
if (!stat) {
Expand All @@ -578,7 +588,7 @@ class Filesystem : public FilesystemIoClient {
}

/** read asynchronously */
FsAsyncTask* ARead(File &f, bool &stat_exists, void *ptr, size_t total_size,
FsAsyncTask *ARead(File &f, bool &stat_exists, void *ptr, size_t total_size,
size_t req_id, IoStatus &io_status, FsIoOptions opts) {
auto mdm = HERMES_FS_METADATA_MANAGER;
auto stat = mdm->Find(f);
Expand All @@ -591,7 +601,7 @@ class Filesystem : public FilesystemIoClient {
}

/** write \a off offset asynchronously */
FsAsyncTask* AWrite(File &f, bool &stat_exists, const void *ptr, size_t off,
FsAsyncTask *AWrite(File &f, bool &stat_exists, const void *ptr, size_t off,
size_t total_size, size_t req_id, IoStatus &io_status,
FsIoOptions opts) {
auto mdm = HERMES_FS_METADATA_MANAGER;
Expand All @@ -606,7 +616,7 @@ class Filesystem : public FilesystemIoClient {
}

/** read \a off offset asynchronously */
FsAsyncTask* ARead(File &f, bool &stat_exists, void *ptr, size_t off,
FsAsyncTask *ARead(File &f, bool &stat_exists, void *ptr, size_t off,
size_t total_size, size_t req_id, IoStatus &io_status,
FsIoOptions opts) {
auto mdm = HERMES_FS_METADATA_MANAGER;
Expand Down
Loading

0 comments on commit 2c8fcb2

Please sign in to comment.