Skip to content

Commit

Permalink
Xsputn (#250)
Browse files Browse the repository at this point in the history
Changed byte counts in MarshallerThread from int to std::streamsize

---------
Co-authored-by: ndp-opendap <ndp@opendap.org>
Co-authored-by: James Gallagher <jgallagher@opendap.org>
  • Loading branch information
travis-ci-opendap authored Nov 7, 2024
1 parent 6ab3774 commit 4b7c265
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 67 deletions.
26 changes: 12 additions & 14 deletions MarshallerThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ Locker::~Locker() {
* the invariant if there is an error and the code exits with a
* summary return.
*/
ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count)
: m_mutex(lock), m_cond(cond), m_count(count) {

ChildLocker::ChildLocker(pthread_mutex_t &lock, pthread_cond_t &cond, int &count): m_mutex(lock), m_cond(cond), m_count(count) {
int status = pthread_mutex_lock(&m_mutex);

DBG(cerr << "Locking the mutex! (simple; " << pthread_self() << ")" << endl);
Expand Down Expand Up @@ -209,9 +209,8 @@ MarshallerThread::~MarshallerThread() {
* bytes from 'byte_buf' to the output stream 'out'
*
*/
void MarshallerThread::start_thread(void *(*thread)(void *arg), ostream &out, char *byte_buf, unsigned int bytes) {
write_args *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
void MarshallerThread::start_thread(void* (*thread)(void *arg), ostream &out, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, out, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
Expand All @@ -220,9 +219,8 @@ void MarshallerThread::start_thread(void *(*thread)(void *arg), ostream &out, ch
/**
* Write 'bytes' bytes from 'byte_buf' to the file descriptor 'fd'.
*/
void MarshallerThread::start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes) {
write_args *args =
new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
void MarshallerThread::start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes) {
auto *args = new write_args(d_out_mutex, d_out_cond, d_child_thread_count, d_thread_error, fd, byte_buf, bytes);
int status = pthread_create(&d_thread, &d_thread_attr, thread, args);
if (status != 0)
throw InternalErr(__FILE__, __LINE__, "Could not start child thread");
Expand All @@ -238,7 +236,7 @@ void MarshallerThread::start_thread(void *(*thread)(void *arg), int fd, char *by
* -1, then use that, else use the ostream reference.
*/
void *MarshallerThread::write_thread(void *arg) {
write_args *args = reinterpret_cast<write_args *>(arg);
auto *args = reinterpret_cast<write_args *>(arg);

ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

Expand All @@ -251,7 +249,7 @@ void *MarshallerThread::write_thread(void *arg) {
// return (void*)-1;

if (args->d_out_file != -1) {
int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
if (bytes_written != args->d_num)
return (void *)-1;
} else {
Expand All @@ -276,7 +274,7 @@ void *MarshallerThread::write_thread(void *arg) {
}
#endif

return 0;
return nullptr;
}

/**
Expand All @@ -292,12 +290,12 @@ void *MarshallerThread::write_thread(void *arg) {
* @return 0 if successful, -1 otherwise.
*/
void *MarshallerThread::write_thread_part(void *arg) {
write_args *args = reinterpret_cast<write_args *>(arg);
auto *args = reinterpret_cast<write_args *>(arg);

ChildLocker lock(args->d_mutex, args->d_cond, args->d_count); // RAII; will unlock on exit

if (args->d_out_file != -1) {
int bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
auto bytes_written = write(args->d_out_file, args->d_buf, args->d_num);
if (bytes_written != args->d_num)
return (void *)-1;
} else {
Expand All @@ -313,5 +311,5 @@ void *MarshallerThread::write_thread_part(void *arg) {
delete[] args->d_buf;
delete args;

return 0;
return nullptr;
}
6 changes: 3 additions & 3 deletions MarshallerThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class MarshallerThread {
* Build args for an ostream. The file descriptor is set to -1
*/
write_args(pthread_mutex_t &m, pthread_cond_t &c, int &count, std::string &e, std::ostream &s, char *vals,
int num)
std::streamsize num)
: d_mutex(m), d_cond(c), d_count(count), d_error(e), d_out(s), d_out_file(-1), d_buf(vals), d_num(num) {}

/**
Expand All @@ -146,8 +146,8 @@ class MarshallerThread {
int &get_child_thread_count() { return d_child_thread_count; }
void increment_child_thread_count() { ++d_child_thread_count; }

void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written);
void start_thread(void *(*thread)(void *arg), int fd, char *byte_buf, unsigned int bytes_written);
void start_thread(void* (*thread)(void *arg), std::ostream &out, char *byte_buf, std::streamsize bytes_written);
void start_thread(void* (*thread)(void *arg), int fd, char *byte_buf, std::streamsize bytes_written);

// These are static so they will have c-linkage - required because they
// are passed to pthread_create()
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Please find the libdap4 API documentation here: https://opendap.github.io/libdap4/html/

* Merge PR from fork regarding configure.ac use of bash-specific syntax.
* Merge xsputn fix for Marshaller code and likely buffer size issues.

## Updated for version 3.21.0 [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.10564122.svg)](https://doi.org/10.5281/zenodo.10564122)

Expand Down
84 changes: 42 additions & 42 deletions tests/TestInt32.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,46 +69,46 @@ TestInt32 &TestInt32::operator=(const TestInt32 &rhs) {
return *this;
}

BaseType *TestInt32::ptr_duplicate() { return new TestInt32(*this); }

void TestInt32::output_values(std::ostream &out) { print_val(out, "", false); }

bool TestInt32::read() {
if (read_p())
return true;

if (test_variable_sleep_interval > 0)
sleep(test_variable_sleep_interval);

if (get_series_values()) {
// This line stopped working when I upgraded the compiler on osx 10.9.
// to version Apple LLVM version 5.1 (clang-503.0.38) (based on LLVM 3.4svn)
// jhrg 3/12/14
// d_buf = d_buf * 32;
//
// d_buf <<= 5;
// if (!d_buf)
// d_buf = 32;
// The above version caused runtime errors on OSX 14.6.1:
//
//. +TestInt32.cc:88:15: runtime error: left shift of 1073741824 by 5 places cannot be represented in type
//'dods_int32' (aka 'int')
// +SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior TestInt32.cc:88:15 in
//
// Replaced with a slower but cautious version below - ndp 09/3/2024
long long val = d_buf;
val <<= 5;
if (val >= 4294967296) {
d_buf = 32;
} else {
d_buf = 0xFFFFFFFF & val;
}
DBGN(cerr << __PRETTY_FUNCTION__ << "d_buf: " << d_buf << endl);

} else {
d_buf = 123456789;
}

set_read_p(true);
return true;
BaseType *
TestInt32::ptr_duplicate()
{
return new TestInt32(*this);
}

void
TestInt32::output_values(std::ostream &out)
{
print_val(out, "", false);
}

bool TestInt32::read()
{
if (read_p()) return true;

if (test_variable_sleep_interval > 0) sleep(test_variable_sleep_interval);

if (get_series_values()) {

// I added this in order to quell complaints from ASAN vis-a-vis
// runtime error: left shift of 1073741824 by 5 places cannot be represented in type 'dods_int32' (aka 'int')
// ndp 05/23/24
d_buf &= 0x07FFFFFF;

// This line stopped working when I upgraded the compiler on osx 10.9.
// to version Apple LLVM version 5.1 (clang-503.0.38) (based on LLVM 3.4svn)
// jhrg 3/12/14
// d_buf = d_buf * 32;
d_buf <<= 5;
if (!d_buf)
d_buf = 32;

DBGN(cerr << __PRETTY_FUNCTION__ << "d_buf: " << d_buf << endl);
}
else {
d_buf = 123456789;
}

set_read_p(true);

return true;
}
7 changes: 3 additions & 4 deletions unit-tests/HTTPConnectTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,12 @@ class HTTPConnectTest : public TestFixture {
// etag = "\"2a008e-157-3fbcd139c2680\"";
// etag = "\"181893-157-3fbcd139c2680\""; // On 10/13/14 we moved to a new httpd and the etag value changed.
// etag ="\"157-3df1e87884680\""; // New httpd service, new etag, ndp - 01/11/21
// etag = "\"157-3df0e26958000\""; // New httpd (dockerized), new etag. ndp - 12/06/22
// etag = "\"157-3df0e26958000\"";// New httpd (dockerized), new etag. ndp - 12/06/22
// lm = "Wed, 13 Jul 2005 19:32:26 GMT";

etag = "\"157-5ef05adba5432\""; // New deploymewnt in us-west, new etag. ndp - 09/04/24
lm = "Sun, 04 Dec 2022 19:35:52 GMT"; // New deploymewnt in us-west, new etag. ndp - 09/04/24
DBG(cerr << prolog << "etag: " << etag << endl);
DBG(cerr << prolog << "lm: " << lm << endl);
DBG(cerr << prolog << "etag: " << etag<< endl);
DBG(cerr << prolog << "lm: " << lm<< endl);

string u("jimg");
string dt(":dods_test@");
Expand Down
135 changes: 131 additions & 4 deletions unit-tests/chunked_iostream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@

#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <cerrno>
#include <cstdlib>

#include <fstream>
#include <iostream>
#include <string>
#include <limits> // std::numeric_limits
#include <exception> // std::exception

#include "chunked_istream.h"
#include "chunked_ostream.h"
Expand All @@ -43,12 +46,29 @@

#include "debug.h"

const string path = (string)TEST_SRC_DIR + "/chunked-io";
#undef DBG
#define DBG(x) do { if (debug) (x); } while(false)
#define prolog string("chunked_iostream_test::").append(__func__).append("() - ")

const string path = (string) TEST_SRC_DIR + "/chunked-io";

using namespace std;
using namespace CppUnit;
using namespace libdap;


string the_test_text="Stephen could remember an evening when he had sat there in the warm,\n"
"deepening twilight, watching the sea; it had barely a ruffle on its surface,\n"
"and yet the Sophie picked up enough moving air with her topgallants\n"
"to draw a long straight whispering furrow across the water, a line\n"
"brilliant with unearthly phosphorescence, visible for quarter of a mile behind her.\n"
"Days and nights of unbelievable purity. Nights when the steady Ionian breeze\n"
"rounded the square mainsail – not a brace to be touched, watch relieving watch –\n"
"and he and Jack on deck, sawing away, sawing away, lost in their music,\n"
"until the falling dew untuned their strings. And days when the perfection\n"
"of dawn was so great, the emptiness so entire, that men were almost afraid to speak.\n";


/**
* The intent is to test writing to and reading from a chunked iostream,
* using various combinations of chunk/buffer sizes and character red/write
Expand All @@ -68,7 +88,11 @@ class chunked_iostream_test : public TestFixture {
chunked_iostream_test() {}
~chunked_iostream_test() {}

void setUp() {

void setUp()
{
DBG( cerr << "\n");

big_file = path + "/test_big_binary_file.bin";
big_file_2 = path + "/test_big_binary_file_2.bin";
big_file_3 = path + "/test_big_binary_file_3.bin"; // not used yet
Expand Down Expand Up @@ -527,7 +551,110 @@ class chunked_iostream_test : public TestFixture {
}
}

CPPUNIT_TEST_SUITE(chunked_iostream_test);
string tf(bool val){
return val?"true":"false";
}

string check_stream(const chunked_ostream &os){
stringstream msg;
if ( !os.good() ){
msg << prolog << "chunked_outfile::good(): " << tf(os.good()) << ").\n";
msg << prolog << "chunked_outfile::eof(): " << tf(os.eof()) << ").\n";
msg << prolog << "chunked_outfile::fail(): " << tf(os.fail()) << ").\n";
msg << prolog << "chunked_outfile::bad(): " << tf(os.bad()) << ").\n";
msg << prolog << "file: " << __FILE__ << " line: "<< __LINE__ << "\n";
}
return msg.str();
}

bool write_chunked_file(string &out_file, const uint64_t target_size){
string error_msg;
fstream outfile(out_file.c_str(), ios::out | ios::binary);
chunked_ostream chunked_outfile(outfile, the_test_text.size());
error_msg = check_stream(chunked_outfile);
if(error_msg.empty()){
the_test_text.size();
uint64_t position = 0;
uint64_t remaining;
std::streamsize outnum;
while( position < target_size){
remaining = target_size - position;
if(the_test_text.size() < remaining){
outnum = the_test_text.size();
}
else {
outnum = remaining;
}
chunked_outfile.write(the_test_text.c_str(), outnum);
error_msg = check_stream(chunked_outfile);
if ( !error_msg.empty() ){
cerr << error_msg;
return false;
}
position += outnum;
}
}
return true;
}

uint64_t read_chunked_file(string ifile, string ofile, unsigned int bufsize){
fstream infile(ifile.c_str(), ios::in | ios::binary);
if (!infile.good()) cerr << "ERROR Failed to open or encountered eof for: " << ifile << "\n";
chunked_istream chunked_infile(infile, bufsize);

fstream outfile(ofile.c_str(), ios::out | ios::binary);
if (!outfile.good()) cerr << "ERROR Failed to open or encountered eof for: " << ofile << "\n";

char str[8096];
// int count = 1;
chunked_infile.read(str, 8096);
auto num = chunked_infile.gcount();

uint64_t sz = 0;
while (num > 0 && !chunked_infile.eof()) {
outfile.write(str, num);
sz += num;
chunked_infile.read(str, 8096);
num = chunked_infile.gcount();
}
if (num > 0 && !chunked_infile.bad()) {
outfile.write(str, num);
sz += num;
}
return sz;
}

/**
* 1GB = 1073741824 bytes
* 2GB = 2147483648 bytes
* 3GB = 3221225472 bytes
* 4GB = 4294967296 bytes
* 5GB = 5368709120 bytes
*/
void write_then_read_large_chunked_file(){
DBG(cerr << "\n");

string chunked_filename = path + "/large-text-file.chunked";
DBG(cerr << prolog << " chunked_filename: " << chunked_filename << "\n");

uint64_t target_size = 1073741824ULL * 5; // 1073741824 == 1GB
DBG(cerr << prolog << " target_size: " << target_size << " bytes\n");

bool success = write_chunked_file(chunked_filename, target_size);
CPPUNIT_ASSERT( success == true );

string plain_file_out = path + "/large-text-file.plain";
DBG(cerr << prolog << " plain_file_out: " << plain_file_out << "\n");
auto size = read_chunked_file(chunked_filename, plain_file_out, 8096);
DBG(cerr << prolog << " read_chunked_file(): " << size << " bytes\n");

CPPUNIT_ASSERT( size == target_size );
}


CPPUNIT_TEST_SUITE (chunked_iostream_test);

CPPUNIT_TEST (write_then_read_large_chunked_file);

CPPUNIT_TEST(test_write_1_read_1_small_file);
CPPUNIT_TEST(test_write_1_read_1_text_file);
Expand Down

0 comments on commit 4b7c265

Please sign in to comment.