Skip to content

Commit

Permalink
Update/refactoring for MarshallerThread - lingering int <-> streamsiz…
Browse files Browse the repository at this point in the history
…e issues.
  • Loading branch information
jgallagher59701 committed Nov 7, 2024
1 parent 4b7c265 commit 07f4800
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 57 deletions.
62 changes: 17 additions & 45 deletions MarshallerThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
using namespace libdap;
using namespace std;

#if 0
#undef TIMING
#if TIMING
bool MarshallerThread::print_time = false;

/**
* Use this with timeval structures returned by gettimeofday() to compute
* real time (instead of user time that is returned by std::clock() or
* get_rusage()).
*/
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
{
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start) {
/* Perform the carry for the later subtraction by updating y. */
if (stop->tv_usec < start->tv_usec) {
int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
Expand Down Expand Up @@ -96,8 +96,6 @@ Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) : m_mute
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
}
if (count != 0)
throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
}
Expand All @@ -109,10 +107,6 @@ Locker::~Locker() {
DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);

(void)pthread_mutex_unlock(&m_mutex);
#if 0
int status = pthread_mutex_unlock(&m_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif
}

/**
Expand All @@ -129,7 +123,8 @@ Locker::~Locker() {
* summary return.
*/

ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count): m_mutex(lock), m_cond(cond), m_count(count) {
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count)
: m_mutex(lock), m_cond(cond), m_count(count) {
int status = pthread_mutex_lock(&m_mutex);

DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
Expand All @@ -147,56 +142,31 @@ ChildLocker::~ChildLocker() {

(void)pthread_cond_signal(&m_cond);
(void)pthread_mutex_unlock(&m_mutex);

#if 0
int status = pthread_cond_signal(&m_cond);
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");

status = pthread_mutex_unlock(&m_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif
}

MarshallerThread::MarshallerThread() : d_thread(0), d_child_thread_count(0) {
MarshallerThread::MarshallerThread() : d_thread(nullptr), d_child_thread_count(0) {
if (pthread_attr_init(&d_thread_attr) != 0)
throw Error(internal_error, "Failed to initialize pthread attributes.");
if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
throw Error(internal_error, "Failed to complete pthread attribute initialization.");

if (pthread_mutex_init(&d_out_mutex, 0) != 0)
if (pthread_mutex_init(&d_out_mutex, nullptr) != 0)
throw Error(internal_error, "Failed to initialize mutex.");
if (pthread_cond_init(&d_out_cond, 0) != 0)
if (pthread_cond_init(&d_out_cond, nullptr) != 0)
throw Error(internal_error, "Failed to initialize cond.");
}

MarshallerThread::~MarshallerThread() {
(void)pthread_mutex_lock(&d_out_mutex);
#if 0
int status = pthread_mutex_lock(&d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
#endif
// d_child_thread_count is passed into the thread in a structure (see write_thread())
// and is decremented by the ChildLocker dtor when write_thread() exits. jhrg 2/7/19
if (d_child_thread_count != 0) {
(void)pthread_cond_wait(&d_out_cond, &d_out_mutex);
d_child_thread_count = 0;
#if 0
status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
#endif
}

(void)pthread_mutex_unlock(&d_out_mutex);

#if 0
if (d_child_thread_count != 0)
throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

status = pthread_mutex_unlock(&d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif

pthread_mutex_destroy(&d_out_mutex);
pthread_cond_destroy(&d_out_cond);

Expand All @@ -209,7 +179,7 @@ MarshallerThread::~MarshallerThread() {
* bytes from 'byte_buf' to the output stream 'out'
*
*/
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
void MarshallerThread::start_thread(void *(*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
Expand All @@ -219,7 +189,7 @@ void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, ch
/**
* Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
*/
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
void MarshallerThread::start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
Expand All @@ -240,9 +210,10 @@ void *MarshallerThread::write_thread(void *arg) {

ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

#if 0
#if TIMING
struct timeval tp_s;
if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
if (print_time && gettimeofday(&tp_s, 0) != 0)
cerr << "could not read time" << endl;
#endif

// force an error
Expand All @@ -265,10 +236,11 @@ void *MarshallerThread::write_thread(void *arg) {
delete[] args->d_buf;
delete args;

#if 0
#if TIMING
struct timeval tp_e;
if (print_time) {
if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
if (gettimeofday(&tp_e, 0) != 0)
cerr << "could not read time" << endl;

cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
}
Expand All @@ -282,7 +254,7 @@ void *MarshallerThread::write_thread(void *arg) {
* by the ostream element of write_args. This is used by start_thread()
* and passed to pthread_create()
*
* @note This differers from MarshallerThread::write_thread() in that it
* @note This differs from MarshallerThread::write_thread() in that it
* writes data starting _after_ the four-byte length prefix that XDR
* adds to the data. It is used for the put_vector_part() calls in
* XDRStreamMarshaller.
Expand Down
24 changes: 12 additions & 12 deletions MarshallerThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ namespace libdap {
*/
class Locker {
public:
Locker() = delete;
Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
virtual ~Locker();

private:
pthread_mutex_t &m_mutex;

Locker();
Locker(const Locker &rhs);
};

Expand All @@ -73,16 +73,15 @@ class Locker {
*/
class ChildLocker {
public:
ChildLocker() = delete;
ChildLocker(const Locker &rhs) = delete;
ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
virtual ~ChildLocker();

private:
pthread_mutex_t &m_mutex;
pthread_cond_t &m_cond;
int &m_count;

ChildLocker();
ChildLocker(const Locker &rhs);
};

/**
Expand Down Expand Up @@ -115,23 +114,24 @@ class MarshallerThread {
pthread_cond_t &d_cond;
int &d_count;
std::string &d_error;
std::ostream &d_out; // The output stream protected by the mutex, ...
int d_out_file; // file descriptor; if not -1, use this.
char *d_buf; // The data to write to the stream
int d_num; // The size of d_buf
std::ostream &d_out; // The output stream protected by the mutex, ...
int d_out_file; // file descriptor; if not -1, use this.
char *d_buf; // The data to write to the stream
std::streamsize d_num; // The size of d_buf

/**
* Build args for an ostream. The file descriptor is set to -1
*/
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals,
std::streamsize num)
std::streamsize num)
: d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num) {}

/**
* Build args for a file descriptr. The ostream is set to cerr (because it is
* a reference and has to be initialized to something).
*/
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals, int num)
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals,
std::streamsize num)
: d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(std::cerr), d_out_file(fd), d_buf(vals),
d_num(num) {}
};
Expand All @@ -146,8 +146,8 @@ class MarshallerThread {
int &get_child_thread_count() { return d_child_thread_count; }
void increment_child_thread_count() { ++d_child_thread_count; }

void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written);
void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes_written);
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written);
void start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes_written);

// These are static so they will have c-linkage - required because they
// are passed to pthread_create()
Expand Down

0 comments on commit 07f4800

Please sign in to comment.