Skip to content

Commit

Permalink
Update/refactoring for MarshallerThread - lingering int <-> streamsiz… (
Browse files Browse the repository at this point in the history
#264)

Update/refactoring for MarshallerThread - lingering int <-> streamsize issues.

Fixed initialization if MarshallerThread.
Some warnings also fixed
  • Loading branch information
jgallagher59701 authored Nov 12, 2024
1 parent 4b7c265 commit 3ebeb96
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 153 deletions.
4 changes: 2 additions & 2 deletions D4Enum.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ class D4Enum : public BaseType {

void dump(ostream &strm) const override;

unsigned int val2buf(void *, bool);
unsigned int buf2val(void **);
unsigned int val2buf(void *, bool) override;
unsigned int buf2val(void **) override;

std::vector<BaseType *> *transform_to_dap2(AttrTable *parent_attr_table) override;
};
Expand Down
64 changes: 19 additions & 45 deletions MarshallerThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@
using namespace libdap;
using namespace std;

#if 0
// Set TIMING to 1 to enable timing output
#define TIMING 0

#if TIMING
bool MarshallerThread::print_time = false;

/**
* Use this with timeval structures returned by gettimeofday() to compute
* real time (instead of user time that is returned by std::clock() or
* get_rusage()).
*/
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start)
{
static double time_diff_to_hundredths(struct timeval *stop, struct timeval *start) {
/* Perform the carry for the later subtraction by updating y. */
if (stop->tv_usec < start->tv_usec) {
int nsec = (start->tv_usec - stop->tv_usec) / 1000000 + 1;
Expand Down Expand Up @@ -96,8 +98,6 @@ Locker::Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count) : m_mute
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
}
if (count != 0)
throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

DBG(cerr << "Locked! (" << pthread_self() << ")" << endl);
}
Expand All @@ -109,10 +109,6 @@ Locker::~Locker() {
DBG(cerr << "Unlocking the mutex! (" << pthread_self() << ")" << endl);

(void)pthread_mutex_unlock(&m_mutex);
#if 0
int status = pthread_mutex_unlock(&m_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif
}

/**
Expand All @@ -129,7 +125,8 @@ Locker::~Locker() {
* summary return.
*/

ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count): m_mutex(lock), m_cond(cond), m_count(count) {
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count)
: m_mutex(lock), m_cond(cond), m_count(count) {
int status = pthread_mutex_lock(&m_mutex);

DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
Expand All @@ -147,56 +144,31 @@ ChildLocker::~ChildLocker() {

(void)pthread_cond_signal(&m_cond);
(void)pthread_mutex_unlock(&m_mutex);

#if 0
int status = pthread_cond_signal(&m_cond);
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not signal main thread from ChildLocker!");

status = pthread_mutex_unlock(&m_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif
}

MarshallerThread::MarshallerThread() : d_thread(0), d_child_thread_count(0) {
MarshallerThread::MarshallerThread() {
if (pthread_attr_init(&d_thread_attr) != 0)
throw Error(internal_error, "Failed to initialize pthread attributes.");
if (pthread_attr_setdetachstate(&d_thread_attr, PTHREAD_CREATE_DETACHED /*PTHREAD_CREATE_JOINABLE*/) != 0)
throw Error(internal_error, "Failed to complete pthread attribute initialization.");

if (pthread_mutex_init(&d_out_mutex, 0) != 0)
if (pthread_mutex_init(&d_out_mutex, nullptr) != 0)
throw Error(internal_error, "Failed to initialize mutex.");
if (pthread_cond_init(&d_out_cond, 0) != 0)
if (pthread_cond_init(&d_out_cond, nullptr) != 0)
throw Error(internal_error, "Failed to initialize cond.");
}

MarshallerThread::~MarshallerThread() {
(void)pthread_mutex_lock(&d_out_mutex);
#if 0
int status = pthread_mutex_lock(&d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not lock m_mutex");
#endif
// d_child_thread_count is passed into the thread in a structure (see write_thread())
// and is decremented by the ChildLocker dtor when write_thread() exits. jhrg 2/7/19
if (d_child_thread_count != 0) {
(void)pthread_cond_wait(&d_out_cond, &d_out_mutex);
d_child_thread_count = 0;
#if 0
status = pthread_cond_wait(&d_out_cond, &d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not wait on m_cond");
#endif
}

(void)pthread_mutex_unlock(&d_out_mutex);

#if 0
if (d_child_thread_count != 0)
throw InternalErr(__FILE__, __LINE__, "FAIL: left m_cond wait with non-zero child thread count");

status = pthread_mutex_unlock(&d_out_mutex);
if (status != 0) throw InternalErr(__FILE__, __LINE__, "Could not unlock m_mutex");
#endif

pthread_mutex_destroy(&d_out_mutex);
pthread_cond_destroy(&d_out_cond);

Expand All @@ -209,7 +181,7 @@ MarshallerThread::~MarshallerThread() {
* bytes from 'byte_buf' to the output stream 'out'
*
*/
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
void MarshallerThread::start_thread(void *(*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
Expand All @@ -219,7 +191,7 @@ void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, ch
/**
* Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
*/
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
void MarshallerThread::start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
Expand All @@ -240,9 +212,10 @@ void *MarshallerThread::write_thread(void *arg) {

ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

#if 0
#if TIMING
struct timeval tp_s;
if (print_time && gettimeofday(&tp_s, 0) != 0) cerr << "could not read time" << endl;
if (print_time && gettimeofday(&tp_s, 0) != 0)
cerr << "could not read time" << endl;
#endif

// force an error
Expand All @@ -265,10 +238,11 @@ void *MarshallerThread::write_thread(void *arg) {
delete[] args->d_buf;
delete args;

#if 0
#if TIMING
struct timeval tp_e;
if (print_time) {
if (gettimeofday(&tp_e, 0) != 0) cerr << "could not read time" << endl;
if (gettimeofday(&tp_e, 0) != 0)
cerr << "could not read time" << endl;

cerr << "time for child thread write: " << time_diff_to_hundredths(&tp_e, &tp_s) << endl;
}
Expand All @@ -282,7 +256,7 @@ void *MarshallerThread::write_thread(void *arg) {
* by the ostream element of write_args. This is used by start_thread()
* and passed to pthread_create()
*
* @note This differers from MarshallerThread::write_thread() in that it
* @note This differs from MarshallerThread::write_thread() in that it
* writes data starting _after_ the four-byte length prefix that XDR
* adds to the data. It is used for the put_vector_part() calls in
* XDRStreamMarshaller.
Expand Down
30 changes: 15 additions & 15 deletions MarshallerThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ namespace libdap {
*/
class Locker {
public:
Locker() = delete;
Locker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
virtual ~Locker();

private:
pthread_mutex_t &m_mutex;

Locker();
Locker(const Locker &rhs);
};

Expand All @@ -73,16 +73,15 @@ class Locker {
*/
class ChildLocker {
public:
ChildLocker() = delete;
ChildLocker(const Locker &rhs) = delete;
ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count);
virtual ~ChildLocker();

private:
pthread_mutex_t &m_mutex;
pthread_cond_t &m_cond;
int &m_count;

ChildLocker();
ChildLocker(const Locker &rhs);
};

/**
Expand All @@ -95,14 +94,14 @@ class ChildLocker {
*/
class MarshallerThread {
private:
pthread_t d_thread;
pthread_t d_thread = 0;
pthread_attr_t d_thread_attr;

pthread_mutex_t d_out_mutex;
pthread_cond_t d_out_cond;

int d_child_thread_count; // 0 or 1
std::string d_thread_error; // non-null indicates an error
int d_child_thread_count = 0; // 0 or 1
std::string d_thread_error; // non-null indicates an error

/**
* Used to pass information into the static methods that run the
Expand All @@ -115,23 +114,24 @@ class MarshallerThread {
pthread_cond_t &d_cond;
int &d_count;
std::string &d_error;
std::ostream &d_out; // The output stream protected by the mutex, ...
int d_out_file; // file descriptor; if not -1, use this.
char *d_buf; // The data to write to the stream
int d_num; // The size of d_buf
std::ostream &d_out; // The output stream protected by the mutex, ...
int d_out_file; // file descriptor; if not -1, use this.
char *d_buf; // The data to write to the stream
std::streamsize d_num; // The size of d_buf

/**
* Build args for an ostream. The file descriptor is set to -1
*/
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals,
std::streamsize num)
std::streamsize num)
: d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num) {}

/**
* Build args for a file descriptr. The ostream is set to cerr (because it is
* a reference and has to be initialized to something).
*/
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals, int num)
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, int fd, char *vals,
std::streamsize num)
: d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(std::cerr), d_out_file(fd), d_buf(vals),
d_num(num) {}
};
Expand All @@ -146,8 +146,8 @@ class MarshallerThread {
int &get_child_thread_count() { return d_child_thread_count; }
void increment_child_thread_count() { ++d_child_thread_count; }

void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written);
void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes_written);
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written);
void start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes_written);

// These are static so they will have c-linkage - required because they
// are passed to pthread_create()
Expand Down
2 changes: 1 addition & 1 deletion ce_expr.lex
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ NEVER [^\-+a-zA-Z0-9_/%.\\:,(){}[\]&<>=~]
<quote><<EOF>> {
BEGIN(INITIAL); /* resetting the state is needed for reentrant parsers */
char msg[256];
sprintf(msg, "Unterminated quote\n");
snprintf(msg, 256, "Unterminated quote\n");
YY_FATAL_ERROR(msg);
}
Expand Down
48 changes: 20 additions & 28 deletions tests/TestInt32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,46 +69,38 @@ TestInt32 &TestInt32::operator=(const TestInt32 &rhs) {
return *this;
}

BaseType *
TestInt32::ptr_duplicate()
{
return new TestInt32(*this);
}
BaseType *TestInt32::ptr_duplicate() { return new TestInt32(*this); }

void
TestInt32::output_values(std::ostream &out)
{
print_val(out, "", false);
}
void TestInt32::output_values(std::ostream &out) { print_val(out, "", false); }

bool TestInt32::read()
{
if (read_p()) return true;
bool TestInt32::read() {
if (read_p())
return true;

if (test_variable_sleep_interval > 0) sleep(test_variable_sleep_interval);
if (test_variable_sleep_interval > 0)
sleep(test_variable_sleep_interval);

if (get_series_values()) {
if (get_series_values()) {

// I added this in order to quell complaints from ASAN vis-a-vis
// runtime error: left shift of 1073741824 by 5 places cannot be represented in type 'dods_int32' (aka 'int')
// ndp 05/23/24
d_buf &= 0x07FFFFFF;

// This line stopped working when I upgraded the compiler on osx 10.9.
// to version Apple LLVM version 5.1 (clang-503.0.38) (based on LLVM 3.4svn)
// jhrg 3/12/14
// d_buf = d_buf * 32;
// This line stopped working when I upgraded the compiler on osx 10.9.
// to version Apple LLVM version 5.1 (clang-503.0.38) (based on LLVM 3.4svn)
// jhrg 3/12/14
// d_buf = d_buf * 32;
d_buf <<= 5;
if (!d_buf)
d_buf = 32;
if (!d_buf)
d_buf = 32;

DBGN(cerr << __PRETTY_FUNCTION__ << "d_buf: " << d_buf << endl);
}
else {
d_buf = 123456789;
}
DBGN(cerr << __PRETTY_FUNCTION__ << "d_buf: " << d_buf << endl);
} else {
d_buf = 123456789;
}

set_read_p(true);
set_read_p(true);

return true;
return true;
}
2 changes: 2 additions & 0 deletions unit-tests/AttrTableTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class AttrTableTest : public TestFixture {
private:
AttrTable *at1;
AttrTable *cont_a, *cont_b, *cont_c, *cont_ba, *cont_ca, *cont_caa;
#if 0
char a[1024];
#endif

public:
AttrTableTest() {}
Expand Down
6 changes: 3 additions & 3 deletions unit-tests/HTTPConnectTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class HTTPConnectTest : public TestFixture {
// lm = "Wed, 13 Jul 2005 19:32:26 GMT";
etag = "\"157-5ef05adba5432\""; // New deploymewnt in us-west, new etag. ndp - 09/04/24
lm = "Sun, 04 Dec 2022 19:35:52 GMT"; // New deploymewnt in us-west, new etag. ndp - 09/04/24
DBG(cerr << prolog << "etag: " << etag<< endl);
DBG(cerr << prolog << "lm: " << lm<< endl);
DBG(cerr << prolog << "etag: " << etag << endl);
DBG(cerr << prolog << "lm: " << lm << endl);

string u("jimg");
string dt(":dods_test@");
Expand All @@ -126,7 +126,7 @@ class HTTPConnectTest : public TestFixture {
DBG(cerr << prolog << "netcdf_das_url: " << netcdf_das_url << endl);
}

void tearDown() {
void tearDown() override {
// normal code doesn't do this - it happens at exit() but not doing
// this here make valgrind think there are leaks.
http->d_http_cache->delete_instance();
Expand Down
Loading

0 comments on commit 3ebeb96

Please sign in to comment.