Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

End test immediately when all number of blocks/bytes sent/received #1775

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iperf.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ enum debug_level {
struct iperf_test
{
pthread_mutex_t print_mutex;
pthread_mutex_t pipe_mutex;

char role; /* 'c' lient or 's' erver */
enum iperf_mode mode;
Expand Down Expand Up @@ -326,6 +327,9 @@ struct iperf_test

int ctrl_sck_mss; /* MSS for the control channel */

int pipe_end_of_test_fds[2]; /* fot select() terminating when -n/-k flags are set */
int pipe_end_of_test_created;

#if defined(HAVE_SSL)
char *server_authorized_users;
EVP_PKEY *server_rsa_private_key;
Expand Down
154 changes: 146 additions & 8 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,69 @@ void iperf_close_logfile(struct iperf_test *test)
}
}


/*
* Open the self pipe.
*/
void iperf_open_pipe_end_of_test(struct iperf_test *test)
{
int flags;
int failures = 0;

if (pipe(test->pipe_end_of_test_fds) == -1) {
warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed");
return;
}
/* pipe was created */
test->pipe_end_of_test_created = 1;
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Created end of test pipe - read_fd=%d, write_fd=%d.\n",
test->pipe_end_of_test_fds[0], test->pipe_end_of_test_fds[1]);
}
/* Make read end of pipe nonblocking */
flags = fcntl(test->pipe_end_of_test_fds[0], F_GETFL);
if (flags == -1) {
warning("Failed to get read side of pipe flags for setting O_NONBLOCK");
failures++;
} else {
flags |= O_NONBLOCK;
if (fcntl(test->pipe_end_of_test_fds[0], F_SETFL, flags) == -1) {
warning("Failed to set read side of pipe flags to O_NONBLOCK");
failures++;
}
}
/* Make write end of pipe nonblocking */
if (failures == 0) {
flags = fcntl(test->pipe_end_of_test_fds[1], F_GETFL);
if (flags == -1) {
warning("Failed to get write side of pipe flags for setting O_NONBLOCK");
failures++;
} else {
flags |= O_NONBLOCK;
if (fcntl(test->pipe_end_of_test_fds[1], F_SETFL, flags) == -1) {
warning("Failed to set write side of pipe flags to O_NONBLOCK");
failures++;
}
}
}

if (failures > 0) {
iperf_close_pipe_end_of_test(test);
}

return;
}

void iperf_close_pipe_end_of_test(struct iperf_test *test)
{
if (test->pipe_end_of_test_created == 1) {
close(test->pipe_end_of_test_fds[0]);
close(test->pipe_end_of_test_fds[1]);
test->pipe_end_of_test_created = 0;
}
}


int
iperf_set_send_state(struct iperf_test *test, signed char state)
{
Expand Down Expand Up @@ -1997,6 +2060,7 @@ iperf_send_mt(struct iperf_stream *sp)
register struct iperf_test *test = sp->test;
struct iperf_time now;
int no_throttle_check;
int rc;

/* Can we do multisend mode? */
if (test->settings->burst != 0)
Expand All @@ -2014,13 +2078,31 @@ iperf_send_mt(struct iperf_stream *sp)
iperf_time_now(&now);
streams_active = 0;
{
if (sp->green_light && sp->sender) {
if (sp->green_light) {
// XXX If we hit one of these ending conditions maybe
// want to stop even trying to send something?
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks))
{
/* Waking up main thread when all data was sent */
if (test->pipe_end_of_test_created == 1) {
/* If write fail on EINTR, etc. retry will be done in next send */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for already sent");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for already sent");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes already sent");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes already sent. Wrote to pipe to wakeup main thread.\n");
}
}
break;
}
if ((r = sp->snd(sp)) < 0) {
if (r == NET_SOFTERROR)
break;
Expand All @@ -2031,6 +2113,28 @@ iperf_send_mt(struct iperf_stream *sp)
test->bytes_sent += r;
if (!sp->pending_size)
++test->blocks_sent;
/* Waking up main thread when all data was sent */
if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks))
{
if (test->pipe_end_of_test_created == 1) {
/* If write fail on EINTR, etc. retry will be done in next send */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for all sent");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for all sent");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes sent");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes sent. Wrote to pipe to wakeup main thread.\n");
}
}
break;
}
if (no_throttle_check)
iperf_check_throttle(sp, &now);
}
Expand All @@ -2053,7 +2157,7 @@ iperf_send_mt(struct iperf_stream *sp)
int
iperf_recv_mt(struct iperf_stream *sp)
{
int r;
int r, rc;
struct iperf_test *test = sp->test;

if ((r = sp->rcv(sp)) < 0) {
Expand All @@ -2067,6 +2171,26 @@ iperf_recv_mt(struct iperf_stream *sp)
if (r > 0) {
test->bytes_received += r;
++test->blocks_received;
/* Waking up main thread when all data was received */
if (test->pipe_end_of_test_created == 1 &&
((test->settings->bytes != 0 && test->bytes_received >= test->settings->bytes) ||
(test->settings->blocks != 0 && test->blocks_received >= test->settings->blocks)))
{
/* If write fail on EINTR, etc. retry will be done in next read */
if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) {
perror("iperf_recv_mt: pthread_mutex_lock pipe_mutex for all received");
}
rc = write(test->pipe_end_of_test_fds[1], "x", 1);
if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) {
perror("iperf_recv_mt: pthread_mutex_unlock pipe_mutex for all received");
}
if (rc <= 0) {
warning("Failed to write to pipe to end test when all blocks or bytes received");
}
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "All blocks or bytes received. Wrote to pipe to wakeup main thread.\n");
}
}
}

return 0;
Expand Down Expand Up @@ -2951,9 +3075,13 @@ iperf_new_test()
}

if (pthread_mutex_init(&(test->print_mutex), &mutexattr) != 0) {
perror("iperf_new_test: pthread_mutex_init");
perror("iperf_new_test: pthread_mutex_init print_mutex");
}
pthread_mutexattr_destroy(&mutexattr);

if (pthread_mutex_init(&(test->pipe_mutex), &mutexattr) != 0) {
perror("iperf_new_test: pthread_mutex_init pipe_mutex");
}
pthread_mutexattr_destroy(&mutexattr);

test->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings));
Expand Down Expand Up @@ -3054,6 +3182,7 @@ iperf_defaults(struct iperf_test *testp)
testp->settings->rcv_timeout.secs = DEFAULT_NO_MSG_RCVD_TIMEOUT / SEC_TO_mS;
testp->settings->rcv_timeout.usecs = (DEFAULT_NO_MSG_RCVD_TIMEOUT % SEC_TO_mS) * mS_TO_US;
testp->zerocopy = 0;
testp->pipe_end_of_test_created = 0;

memset(testp->cookie, 0, COOKIE_SIZE);

Expand Down Expand Up @@ -3130,6 +3259,7 @@ iperf_defaults(struct iperf_test *testp)
void
iperf_free_test(struct iperf_test *test)
{
int rc;
struct protocol *prot;
struct iperf_stream *sp;

Expand Down Expand Up @@ -3210,11 +3340,10 @@ iperf_free_test(struct iperf_test *test)
}

/* Destroy print mutex. iperf_printf() doesn't work after this point */
int rc;
rc = pthread_mutex_destroy(&(test->print_mutex));
if (rc != 0) {
errno = rc;
perror("iperf_free_test: pthread_mutex_destroy");
perror("iperf_free_test: pthread_mutex_destroy print_mutex");
}

if (test->logfile) {
Expand All @@ -3223,6 +3352,14 @@ iperf_free_test(struct iperf_test *test)
iperf_close_logfile(test);
}

iperf_close_pipe_end_of_test(test);
/* Destroy end of test pipe mutex. */
rc = pthread_mutex_destroy(&(test->pipe_mutex));
if (rc != 0) {
errno = rc;
perror("iperf_free_test: pthread_mutex_destroy pipe_mutex");
}

if (test->server_output_text) {
free(test->server_output_text);
test->server_output_text = NULL;
Expand Down Expand Up @@ -3351,6 +3488,7 @@ iperf_reset_test(struct iperf_test *test)
test->settings->tos = 0;
test->settings->dont_fragment = 0;
test->zerocopy = 0;
test->pipe_end_of_test_created = 0;

#if defined(HAVE_SSL)
if (test->settings->authtoken) {
Expand Down
2 changes: 2 additions & 0 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ int iperf_create_send_timers(struct iperf_test *);
int iperf_parse_arguments(struct iperf_test *, int, char **);
int iperf_open_logfile(struct iperf_test *);
void iperf_close_logfile(struct iperf_test *);
void iperf_open_pipe_end_of_test(struct iperf_test *);
void iperf_close_pipe_end_of_test(struct iperf_test *);
void iperf_reset_test(struct iperf_test *);
void iperf_reset_stats(struct iperf_test * test);

Expand Down
29 changes: 28 additions & 1 deletion src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ iperf_run_client(struct iperf_test * test)
int64_t timeout_us;
int64_t rcv_timeout_us;
int i_errno_save;
char chr;

if (NULL == test)
{
Expand Down Expand Up @@ -684,12 +685,25 @@ iperf_run_client(struct iperf_test * test)
}

if (result > 0) {
if (FD_ISSET(test->ctrl_sck, &read_set)) {
if (FD_ISSET(test->ctrl_sck, &read_set)) { /* Control message */
if (iperf_handle_message_client(test) < 0) {
goto cleanup_and_fail;
}
FD_CLR(test->ctrl_sck, &read_set);
}

/* Wakedup by sent all of data */
if (test->pipe_end_of_test_created && FD_ISSET(test->pipe_end_of_test_fds[0], &read_set)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Main thread woke up by pipe input that all blocks or bytes already sent/received.\n");
}
for (;;) { /* Consume bytes from pipe */
if (read(test->pipe_end_of_test_fds[0], &chr, 1) <= 0) {
break;
}
}
FD_CLR(test->pipe_end_of_test_fds[0], &read_set);
}
}

if (test->state == TEST_RUNNING) {
Expand All @@ -698,6 +712,19 @@ iperf_run_client(struct iperf_test * test)
if (startup) {
startup = 0;

/* Create self pipe to allow select() end when number of bytes/block were sent */
if (test->settings->bytes > 0 || test->settings->blocks > 0) {
iperf_open_pipe_end_of_test(test);
if (test->pipe_end_of_test_created == 0) {
warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed");
} else { /* pipe was created */
/* Add the pipe to the select() read fds */
FD_SET(test->pipe_end_of_test_fds[0], &test->read_set);
test->max_fd = (test->pipe_end_of_test_fds[0] > test->max_fd) ?
test->pipe_end_of_test_fds[0] : test->max_fd;
}
}

/* Create and spin up threads */
pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0) {
Expand Down
Loading