From 8c939c8c6509baaacb7668923ada2c396ae7a511 Mon Sep 17 00:00:00 2001 From: DavidBar-On Date: Sun, 22 Sep 2024 16:12:44 +0300 Subject: [PATCH] End test immediatelly when all number of blocks/bytes sent/received --- src/iperf.h | 4 ++ src/iperf_api.c | 154 ++++++++++++++++++++++++++++++++++++++--- src/iperf_api.h | 2 + src/iperf_client_api.c | 29 +++++++- 4 files changed, 180 insertions(+), 9 deletions(-) diff --git a/src/iperf.h b/src/iperf.h index 7d14a3453..0c604d49c 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -288,6 +288,7 @@ enum debug_level { struct iperf_test { pthread_mutex_t print_mutex; + pthread_mutex_t pipe_mutex; char role; /* 'c' lient or 's' erver */ enum iperf_mode mode; @@ -326,6 +327,9 @@ struct iperf_test int ctrl_sck_mss; /* MSS for the control channel */ + int pipe_end_of_test_fds[2]; /* fot select() terminating when -n/-k flags are set */ + int pipe_end_of_test_created; + #if defined(HAVE_SSL) char *server_authorized_users; EVP_PKEY *server_rsa_private_key; diff --git a/src/iperf_api.c b/src/iperf_api.c index 60efd1273..e5940b394 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -1862,6 +1862,69 @@ void iperf_close_logfile(struct iperf_test *test) } } + +/* + * Open the self pipe. + */ +void iperf_open_pipe_end_of_test(struct iperf_test *test) +{ + int flags; + int failures = 0; + + if (pipe(test->pipe_end_of_test_fds) == -1) { + warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed"); + return; + } + /* pipe was created */ + test->pipe_end_of_test_created = 1; + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Created end of test pipe - read_fd=%d, write_fd=%d.\n", + test->pipe_end_of_test_fds[0], test->pipe_end_of_test_fds[1]); + } + /* Make read end of pipe nonblocking */ + flags = fcntl(test->pipe_end_of_test_fds[0], F_GETFL); + if (flags == -1) { + warning("Failed to get read side of pipe flags for setting O_NONBLOCK"); + failures++; + } else { + flags |= O_NONBLOCK; + if (fcntl(test->pipe_end_of_test_fds[0], F_SETFL, flags) == -1) { + warning("Failed to set read side of pipe flags to O_NONBLOCK"); + failures++; + } + } + /* Make write end of pipe nonblocking */ + if (failures == 0) { + flags = fcntl(test->pipe_end_of_test_fds[1], F_GETFL); + if (flags == -1) { + warning("Failed to get write side of pipe flags for setting O_NONBLOCK"); + failures++; + } else { + flags |= O_NONBLOCK; + if (fcntl(test->pipe_end_of_test_fds[1], F_SETFL, flags) == -1) { + warning("Failed to set write side of pipe flags to O_NONBLOCK"); + failures++; + } + } + } + + if (failures > 0) { + iperf_close_pipe_end_of_test(test); + } + + return; +} + +void iperf_close_pipe_end_of_test(struct iperf_test *test) +{ + if (test->pipe_end_of_test_created == 1) { + close(test->pipe_end_of_test_fds[0]); + close(test->pipe_end_of_test_fds[1]); + test->pipe_end_of_test_created = 0; + } +} + + int iperf_set_send_state(struct iperf_test *test, signed char state) { @@ -1997,6 +2060,7 @@ iperf_send_mt(struct iperf_stream *sp) register struct iperf_test *test = sp->test; struct iperf_time now; int no_throttle_check; + int rc; /* Can we do multisend mode? */ if (test->settings->burst != 0) @@ -2014,13 +2078,31 @@ iperf_send_mt(struct iperf_stream *sp) iperf_time_now(&now); streams_active = 0; { - if (sp->green_light && sp->sender) { + if (sp->green_light) { // XXX If we hit one of these ending conditions maybe // want to stop even trying to send something? - if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) - break; - if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks) + if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) || + (test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)) + { + /* Waking up main thread when all data was sent */ + if (test->pipe_end_of_test_created == 1) { + /* If write fail on EINTR, etc. retry will be done in next send */ + if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) { + perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for already sent"); + } + rc = write(test->pipe_end_of_test_fds[1], "x", 1); + if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) { + perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for already sent"); + } + if (rc <= 0) { + warning("Failed to write to pipe to end test when all blocks or bytes already sent"); + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All blocks or bytes already sent. Wrote to pipe to wakeup main thread.\n"); + } + } break; + } if ((r = sp->snd(sp)) < 0) { if (r == NET_SOFTERROR) break; @@ -2031,6 +2113,28 @@ iperf_send_mt(struct iperf_stream *sp) test->bytes_sent += r; if (!sp->pending_size) ++test->blocks_sent; + /* Waking up main thread when all data was sent */ + if ((test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) || + (test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)) + { + if (test->pipe_end_of_test_created == 1) { + /* If write fail on EINTR, etc. retry will be done in next send */ + if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) { + perror("iperf_send_mt: pthread_mutex_lock pipe_mutex for all sent"); + } + rc = write(test->pipe_end_of_test_fds[1], "x", 1); + if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) { + perror("iperf_send_mt: pthread_mutex_unlock pipe_mutex for all sent"); + } + if (rc <= 0) { + warning("Failed to write to pipe to end test when all blocks or bytes sent"); + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All blocks or bytes sent. Wrote to pipe to wakeup main thread.\n"); + } + } + break; + } if (no_throttle_check) iperf_check_throttle(sp, &now); } @@ -2053,7 +2157,7 @@ iperf_send_mt(struct iperf_stream *sp) int iperf_recv_mt(struct iperf_stream *sp) { - int r; + int r, rc; struct iperf_test *test = sp->test; if ((r = sp->rcv(sp)) < 0) { @@ -2067,6 +2171,26 @@ iperf_recv_mt(struct iperf_stream *sp) if (r > 0) { test->bytes_received += r; ++test->blocks_received; + /* Waking up main thread when all data was received */ + if (test->pipe_end_of_test_created == 1 && + ((test->settings->bytes != 0 && test->bytes_received >= test->settings->bytes) || + (test->settings->blocks != 0 && test->blocks_received >= test->settings->blocks))) + { + /* If write fail on EINTR, etc. retry will be done in next read */ + if (pthread_mutex_lock(&(test->pipe_mutex)) != 0) { + perror("iperf_recv_mt: pthread_mutex_lock pipe_mutex for all received"); + } + rc = write(test->pipe_end_of_test_fds[1], "x", 1); + if (pthread_mutex_unlock(&(test->pipe_mutex)) != 0) { + perror("iperf_recv_mt: pthread_mutex_unlock pipe_mutex for all received"); + } + if (rc <= 0) { + warning("Failed to write to pipe to end test when all blocks or bytes received"); + } + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "All blocks or bytes received. Wrote to pipe to wakeup main thread.\n"); + } + } } return 0; @@ -2951,9 +3075,13 @@ iperf_new_test() } if (pthread_mutex_init(&(test->print_mutex), &mutexattr) != 0) { - perror("iperf_new_test: pthread_mutex_init"); + perror("iperf_new_test: pthread_mutex_init print_mutex"); } + pthread_mutexattr_destroy(&mutexattr); + if (pthread_mutex_init(&(test->pipe_mutex), &mutexattr) != 0) { + perror("iperf_new_test: pthread_mutex_init pipe_mutex"); + } pthread_mutexattr_destroy(&mutexattr); test->settings = (struct iperf_settings *) malloc(sizeof(struct iperf_settings)); @@ -3054,6 +3182,7 @@ iperf_defaults(struct iperf_test *testp) testp->settings->rcv_timeout.secs = DEFAULT_NO_MSG_RCVD_TIMEOUT / SEC_TO_mS; testp->settings->rcv_timeout.usecs = (DEFAULT_NO_MSG_RCVD_TIMEOUT % SEC_TO_mS) * mS_TO_US; testp->zerocopy = 0; + testp->pipe_end_of_test_created = 0; memset(testp->cookie, 0, COOKIE_SIZE); @@ -3130,6 +3259,7 @@ iperf_defaults(struct iperf_test *testp) void iperf_free_test(struct iperf_test *test) { + int rc; struct protocol *prot; struct iperf_stream *sp; @@ -3210,11 +3340,10 @@ iperf_free_test(struct iperf_test *test) } /* Destroy print mutex. iperf_printf() doesn't work after this point */ - int rc; rc = pthread_mutex_destroy(&(test->print_mutex)); if (rc != 0) { errno = rc; - perror("iperf_free_test: pthread_mutex_destroy"); + perror("iperf_free_test: pthread_mutex_destroy print_mutex"); } if (test->logfile) { @@ -3223,6 +3352,14 @@ iperf_free_test(struct iperf_test *test) iperf_close_logfile(test); } + iperf_close_pipe_end_of_test(test); + /* Destroy end of test pipe mutex. */ + rc = pthread_mutex_destroy(&(test->pipe_mutex)); + if (rc != 0) { + errno = rc; + perror("iperf_free_test: pthread_mutex_destroy pipe_mutex"); + } + if (test->server_output_text) { free(test->server_output_text); test->server_output_text = NULL; @@ -3351,6 +3488,7 @@ iperf_reset_test(struct iperf_test *test) test->settings->tos = 0; test->settings->dont_fragment = 0; test->zerocopy = 0; + test->pipe_end_of_test_created = 0; #if defined(HAVE_SSL) if (test->settings->authtoken) { diff --git a/src/iperf_api.h b/src/iperf_api.h index 2b71613e9..b09ecb061 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -336,6 +336,8 @@ int iperf_create_send_timers(struct iperf_test *); int iperf_parse_arguments(struct iperf_test *, int, char **); int iperf_open_logfile(struct iperf_test *); void iperf_close_logfile(struct iperf_test *); +void iperf_open_pipe_end_of_test(struct iperf_test *); +void iperf_close_pipe_end_of_test(struct iperf_test *); void iperf_reset_test(struct iperf_test *); void iperf_reset_stats(struct iperf_test * test); diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index d2542f717..452c0d80d 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -572,6 +572,7 @@ iperf_run_client(struct iperf_test * test) int64_t timeout_us; int64_t rcv_timeout_us; int i_errno_save; + char chr; if (NULL == test) { @@ -684,12 +685,25 @@ iperf_run_client(struct iperf_test * test) } if (result > 0) { - if (FD_ISSET(test->ctrl_sck, &read_set)) { + if (FD_ISSET(test->ctrl_sck, &read_set)) { /* Control message */ if (iperf_handle_message_client(test) < 0) { goto cleanup_and_fail; } FD_CLR(test->ctrl_sck, &read_set); } + + /* Wakedup by sent all of data */ + if (test->pipe_end_of_test_created && FD_ISSET(test->pipe_end_of_test_fds[0], &read_set)) { + if (test->debug_level >= DEBUG_LEVEL_INFO) { + iperf_printf(test, "Main thread woke up by pipe input that all blocks or bytes already sent/received.\n"); + } + for (;;) { /* Consume bytes from pipe */ + if (read(test->pipe_end_of_test_fds[0], &chr, 1) <= 0) { + break; + } + } + FD_CLR(test->pipe_end_of_test_fds[0], &read_set); + } } if (test->state == TEST_RUNNING) { @@ -698,6 +712,19 @@ iperf_run_client(struct iperf_test * test) if (startup) { startup = 0; + /* Create self pipe to allow select() end when number of bytes/block were sent */ + if (test->settings->bytes > 0 || test->settings->blocks > 0) { + iperf_open_pipe_end_of_test(test); + if (test->pipe_end_of_test_created == 0) { + warning("Failed to create pipe to end test immediatelly when sending bytes/blocks in completed"); + } else { /* pipe was created */ + /* Add the pipe to the select() read fds */ + FD_SET(test->pipe_end_of_test_fds[0], &test->read_set); + test->max_fd = (test->pipe_end_of_test_fds[0] > test->max_fd) ? + test->pipe_end_of_test_fds[0] : test->max_fd; + } + } + /* Create and spin up threads */ pthread_attr_t attr; if (pthread_attr_init(&attr) != 0) {