From e3963628b20e13e1bfe3644d4b22555c1777acb7 Mon Sep 17 00:00:00 2001 From: David Bar-On Date: Sun, 10 Dec 2023 14:53:08 +0200 Subject: [PATCH 1/2] Reimplement for Multi-thread and remove write_set --- src/iperf.h | 1 - src/iperf_api.c | 74 ++++++++++++++++++++++++++---------------- src/iperf_api.h | 8 ++++- src/iperf_client_api.c | 35 +++++++++++++++----- src/iperf_error.c | 4 --- src/iperf_server_api.c | 34 ++++++------------- src/iperf_tcp.c | 2 +- 7 files changed, 90 insertions(+), 68 deletions(-) diff --git a/src/iperf.h b/src/iperf.h index c1d839be1..d40857155 100644 --- a/src/iperf.h +++ b/src/iperf.h @@ -345,7 +345,6 @@ struct iperf_test /* Select related parameters */ int max_fd; fd_set read_set; /* set of read sockets */ - fd_set write_set; /* set of write sockets */ /* Interval related members */ int omitting; diff --git a/src/iperf_api.c b/src/iperf_api.c index d40561c10..86bdf53ad 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -1698,9 +1698,6 @@ iperf_parse_arguments(struct iperf_test *test, int argc, char **argv) } else if (test->role == 'c' && (test->server_skew_threshold != 0)){ i_errno = IESERVERONLY; return -1; - } else if (test->role == 'c' && rcv_timeout_flag && test->mode == SENDER){ - i_errno = IERVRSONLYRCVTIMEOUT; - return -1; } else if (test->role == 's' && (server_rsa_private_key || test->server_authorized_users) && !(server_rsa_private_key && test->server_authorized_users)) { i_errno = IESETSERVERAUTH; @@ -1857,13 +1854,28 @@ void iperf_close_logfile(struct iperf_test *test) int iperf_set_send_state(struct iperf_test *test, signed char state) { + int l; + const char c = 0; + + if (test->debug_level >= DEBUG_LEVEL_INFO) + fprintf(stderr, "Setting and sending new test state %d (changed from %d).\n", state, test->state); + if (test->ctrl_sck >= 0) { test->state = state; if (Nwrite(test->ctrl_sck, (char*) &state, sizeof(state), Ptcp) < 0) { i_errno = IESENDMESSAGE; return -1; } + + if (state == IPERF_DONE || state == CLIENT_TERMINATE) { + // Send additional bytes to complete the sent size to JSON length prefix, + // in case the other size is waiting for JSON. + l = sizeof(uint32_t) - sizeof(state); + while (l-- > 0) + Nwrite(test->ctrl_sck, &c, 1, Ptcp); + } } + return 0; } @@ -2386,6 +2398,9 @@ get_parameters(struct iperf_test *test) static int send_results(struct iperf_test *test) { + if (test->debug_level >= DEBUG_LEVEL_INFO) + fprintf(stderr, "Sending results.\n"); + int r = 0; cJSON *j; cJSON *j_streams; @@ -2487,6 +2502,7 @@ send_results(struct iperf_test *test) } cJSON_Delete(j); } + return r; } @@ -2495,6 +2511,9 @@ send_results(struct iperf_test *test) static int get_results(struct iperf_test *test) { + if (test->debug_level >= DEBUG_LEVEL_INFO) + fprintf(stderr, "Getting results.\n"); + int r = 0; cJSON *j; cJSON *j_cpu_util_total; @@ -3106,14 +3125,7 @@ iperf_free_test(struct iperf_test *test) free(test->remote_congestion_used); if (test->timestamp_format) free(test->timestamp_format); - if (test->omit_timer != NULL) - tmr_cancel(test->omit_timer); - if (test->timer != NULL) - tmr_cancel(test->timer); - if (test->stats_timer != NULL) - tmr_cancel(test->stats_timer); - if (test->reporter_timer != NULL) - tmr_cancel(test->reporter_timer); + iperf_cancel_test_timers(test); /* Free protocol list */ while (!SLIST_EMPTY(&test->protocols)) { @@ -3193,22 +3205,9 @@ iperf_reset_test(struct iperf_test *test) SLIST_REMOVE_HEAD(&test->streams, streams); iperf_free_stream(sp); } - if (test->omit_timer != NULL) { - tmr_cancel(test->omit_timer); - test->omit_timer = NULL; - } - if (test->timer != NULL) { - tmr_cancel(test->timer); - test->timer = NULL; - } - if (test->stats_timer != NULL) { - tmr_cancel(test->stats_timer); - test->stats_timer = NULL; - } - if (test->reporter_timer != NULL) { - tmr_cancel(test->reporter_timer); - test->reporter_timer = NULL; - } + + iperf_cancel_test_timers(test); + test->done = 0; SLIST_INIT(&test->streams); @@ -3252,7 +3251,6 @@ iperf_reset_test(struct iperf_test *test) test->no_delay = 0; FD_ZERO(&test->read_set); - FD_ZERO(&test->write_set); test->num_streams = 1; test->settings->socket_bufsize = 0; @@ -3339,6 +3337,26 @@ iperf_reset_stats(struct iperf_test *test) } } +void +iperf_cancel_test_timers(struct iperf_test *test) +{ + if (test->stats_timer != NULL) { + tmr_cancel(test->stats_timer); + test->stats_timer = NULL; + } + if (test->reporter_timer != NULL) { + tmr_cancel(test->reporter_timer); + test->reporter_timer = NULL; + } + if (test->omit_timer != NULL) { + tmr_cancel(test->omit_timer); + test->omit_timer = NULL; + } + if (test->timer != NULL) { + tmr_cancel(test->timer); + test->timer = NULL; + } +} /**************************************************************************/ diff --git a/src/iperf_api.h b/src/iperf_api.h index d2bbdfe96..0171fe7f9 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -276,6 +276,12 @@ int iperf_defaults(struct iperf_test * testp); */ void iperf_free_test(struct iperf_test * testp); +/** + * iperf_cancel_test_timers -- cancel test timers + * + */ +void iperf_cancel_test_timers(struct iperf_test * testp); + /** * iperf_new_stream -- return a net iperf_stream with default values * @@ -415,7 +421,7 @@ enum { IESKEWTHRESHOLD = 29, // Invalid value specified as skew threshold IEIDLETIMEOUT = 30, // Invalid value specified as idle state timeout IERCVTIMEOUT = 31, // Illegal message receive timeout - IERVRSONLYRCVTIMEOUT = 32, // Client receive timeout is valid only in reverse mode + // [DELETED] IERVRSONLYRCVTIMEOUT = 32, // Client receive timeout is valid only in reverse mode IESNDTIMEOUT = 33, // Illegal message send timeout IEUDPFILETRANSFER = 34, // Cannot transfer file using UDP IESERVERAUTHUSERS = 35, // Cannot access authorized users file diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index 7ad4c939b..51aece1fa 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -392,7 +392,6 @@ iperf_connect(struct iperf_test *test) return -1; } FD_ZERO(&test->read_set); - FD_ZERO(&test->write_set); make_cookie(test->cookie); @@ -499,6 +498,9 @@ iperf_connect(struct iperf_test *test) int iperf_client_end(struct iperf_test *test) { + if (test->debug_level >= DEBUG_LEVEL_INFO) + fprintf(stderr, "Ending client test.\n"); + if (NULL == test) { iperf_err(NULL, "No test\n"); @@ -533,7 +535,7 @@ iperf_run_client(struct iperf_test * test) { int startup; int result = 0; - fd_set read_set, write_set; + fd_set read_set; struct iperf_time now; struct timeval* timeout = NULL; struct iperf_stream *sp; @@ -544,6 +546,7 @@ iperf_run_client(struct iperf_test * test) int64_t t_usecs; int64_t timeout_us; int64_t rcv_timeout_us; + int64_t rcv_timeout_value_in_us; int i_errno_save; if (NULL == test) @@ -580,8 +583,9 @@ iperf_run_client(struct iperf_test * test) /* Begin calculating CPU utilization */ cpu_util(NULL); + rcv_timeout_value_in_us = (test->settings->rcv_timeout.secs * SEC_TO_US) + test->settings->rcv_timeout.usecs; if (test->mode != SENDER) - rcv_timeout_us = (test->settings->rcv_timeout.secs * SEC_TO_US) + test->settings->rcv_timeout.usecs; + rcv_timeout_us = rcv_timeout_value_in_us; else rcv_timeout_us = 0; @@ -591,12 +595,17 @@ iperf_run_client(struct iperf_test * test) startup = 1; while (test->state != IPERF_DONE) { memcpy(&read_set, &test->read_set, sizeof(fd_set)); - memcpy(&write_set, &test->write_set, sizeof(fd_set)); iperf_time_now(&now); timeout = tmr_timeout(&now); - // In reverse active mode client ensures data is received - if (test->state == TEST_RUNNING && rcv_timeout_us > 0) { + // In reverse/bidir active mode client ensures data is received. + // Same for receiving control messages at the end of the test. + if ( (rcv_timeout_us > 0 && test->state == TEST_RUNNING) + || (rcv_timeout_value_in_us > 0 + && (test->state == TEST_END + || test->state == EXCHANGE_RESULTS + || test->state == DISPLAY_RESULTS)) ) + { timeout_us = -1; if (timeout != NULL) { used_timeout.tv_sec = timeout->tv_sec; @@ -621,16 +630,22 @@ iperf_run_client(struct iperf_test * test) result = select(test->max_fd + 1, &read_set, - (test->state == TEST_RUNNING && !test->reverse) ? &write_set : NULL, + NULL, NULL, timeout); #else - result = select(test->max_fd + 1, &read_set, &write_set, NULL, timeout); + result = select(test->max_fd + 1, &read_set, NULL, NULL, timeout); #endif // __vxworks or __VXWORKS__ if (result < 0 && errno != EINTR) { i_errno = IESELECT; goto cleanup_and_fail; - } else if (result == 0 && test->state == TEST_RUNNING && rcv_timeout_us > 0) { + } else if ( result == 0 && + ((rcv_timeout_us > 0 && test->state == TEST_RUNNING) + || (rcv_timeout_value_in_us > 0 + && (test->state == TEST_END + || test->state == EXCHANGE_RESULTS + || test->state == DISPLAY_RESULTS))) ) + { /* * If nothing was received in non-reverse running state * then probably something got stuck - either client, @@ -748,6 +763,8 @@ iperf_run_client(struct iperf_test * test) test->done = 1; cpu_util(test->cpu_util); test->stats_callback(test); + // Timers not needed at test end and may interrupt with select() receive timeout + iperf_cancel_test_timers(test); if (iperf_set_send_state(test, TEST_END) != 0) goto cleanup_and_fail; } diff --git a/src/iperf_error.c b/src/iperf_error.c index 0fedf3110..d2ee9a1a8 100644 --- a/src/iperf_error.c +++ b/src/iperf_error.c @@ -377,10 +377,6 @@ iperf_strerror(int int_errno) case IEUDPFILETRANSFER: snprintf(errstr, len, "cannot transfer file using UDP"); break; - case IERVRSONLYRCVTIMEOUT: - snprintf(errstr, len, "client receive timeout is valid only in receiving mode"); - perr = 1; - break; case IEDAEMON: snprintf(errstr, len, "unable to become a daemon"); perr = 1; diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 3bd20c62d..9032d9bda 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -124,7 +124,6 @@ iperf_server_listen(struct iperf_test *test) } FD_ZERO(&test->read_set); - FD_ZERO(&test->write_set); FD_SET(test->listener, &test->read_set); if (test->listener > test->max_fd) test->max_fd = test->listener; @@ -236,7 +235,6 @@ iperf_handle_message_server(struct iperf_test *test) test->stats_callback(test); SLIST_FOREACH(sp, &test->streams, streams) { FD_CLR(sp->socket, &test->read_set); - FD_CLR(sp->socket, &test->write_set); close(sp->socket); } test->reporter_callback(test); @@ -266,8 +264,8 @@ iperf_handle_message_server(struct iperf_test *test) iperf_err(test, "the client has terminated"); SLIST_FOREACH(sp, &test->streams, streams) { FD_CLR(sp->socket, &test->read_set); - FD_CLR(sp->socket, &test->write_set); close(sp->socket); + sp->socket = -1; } test->state = IPERF_DONE; break; @@ -294,6 +292,7 @@ server_timer_proc(TimerClientData client_data, struct iperf_time *nowP) sp = SLIST_FIRST(&test->streams); SLIST_REMOVE_HEAD(&test->streams, streams); close(sp->socket); + sp->socket = -1; iperf_free_stream(sp); } close(test->ctrl_sck); @@ -444,7 +443,6 @@ cleanup_server(struct iperf_test *test) SLIST_FOREACH(sp, &test->streams, streams) { if (sp->socket > -1) { FD_CLR(sp->socket, &test->read_set); - FD_CLR(sp->socket, &test->write_set); close(sp->socket); sp->socket = -1; } @@ -463,28 +461,13 @@ cleanup_server(struct iperf_test *test) close(test->prot_listener); test->prot_listener = -1; } - - /* Cancel any remaining timers. */ - if (test->stats_timer != NULL) { - tmr_cancel(test->stats_timer); - test->stats_timer = NULL; - } - if (test->reporter_timer != NULL) { - tmr_cancel(test->reporter_timer); - test->reporter_timer = NULL; - } - if (test->omit_timer != NULL) { - tmr_cancel(test->omit_timer); - test->omit_timer = NULL; - } + + iperf_cancel_test_timers(test); /* Cancel any remaining timers. */ + if (test->congestion_used != NULL) { free(test->congestion_used); test->congestion_used = NULL; } - if (test->timer != NULL) { - tmr_cancel(test->timer); - test->timer = NULL; - } } @@ -560,7 +543,6 @@ iperf_run_server(struct iperf_test *test) } memcpy(&read_set, &test->read_set, sizeof(fd_set)); - memcpy(&write_set, &test->write_set, sizeof(fd_set)); iperf_time_now(&now); timeout = tmr_timeout(&now); @@ -572,7 +554,11 @@ iperf_run_server(struct iperf_test *test) used_timeout.tv_usec = 0; timeout = &used_timeout; } - } else if (test->mode != SENDER) { // In non-reverse active mode server ensures data is received + } else if (test->mode != SENDER // In non-reverse active mode server ensures data is received. + || test->state == TEST_END // Same for receiving control messages at the end of the test. + || test->state == EXCHANGE_RESULTS + || test->state == DISPLAY_RESULTS) + { timeout_us = -1; if (timeout != NULL) { used_timeout.tv_sec = timeout->tv_sec; diff --git a/src/iperf_tcp.c b/src/iperf_tcp.c index 71c40a6ea..91b025c13 100644 --- a/src/iperf_tcp.c +++ b/src/iperf_tcp.c @@ -69,7 +69,7 @@ iperf_tcp_recv(struct iperf_stream *sp) } else { if (sp->test->debug) - printf("Late receive, state = %d\n", sp->test->state); + printf("Late receive, state = %d, bytes received = %d\n", sp->test->state, r); } return r; From fcf1c46213fb5fff3c4e14f5e32ba3e866111218 Mon Sep 17 00:00:00 2001 From: David Bar-On Date: Fri, 10 May 2024 09:49:26 +0300 Subject: [PATCH 2/2] Completely remove serevr write_set per comment received --- src/iperf_server_api.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 9032d9bda..9ec9437ac 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -480,7 +480,7 @@ iperf_run_server(struct iperf_test *test) #if defined(HAVE_TCP_CONGESTION) int saved_errno; #endif /* HAVE_TCP_CONGESTION */ - fd_set read_set, write_set; + fd_set read_set; struct iperf_stream *sp; struct iperf_time now; struct iperf_time last_receive_time; @@ -576,7 +576,7 @@ iperf_run_server(struct iperf_test *test) timeout = &used_timeout; } - result = select(test->max_fd + 1, &read_set, &write_set, NULL, timeout); + result = select(test->max_fd + 1, &read_set, NULL, NULL, timeout); if (result < 0 && errno != EINTR) { cleanup_server(test); i_errno = IESELECT;