Skip to content

Commit

Permalink
do not account latencies for --first-message
Browse files Browse the repository at this point in the history
  • Loading branch information
lwalkin committed Apr 14, 2015
1 parent fc21e3d commit 9ee4e19
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 39 deletions.
3 changes: 2 additions & 1 deletion ChangeLog
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

0.?:
* --enable-asan and --enable-tsan flags to enable address/thread sanitizer.
* --enable-asan and --enable-tsan flags to enable address/thread sanitizer.
* Do not account latencies for --first-message.

0.4.9: 2015-Apr-09
* Maximum storable latency increased from 10s to 100s.
Expand Down
89 changes: 51 additions & 38 deletions src/tcpkali_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ struct connection {
off_t write_offset;
atomic_wide_t data_sent;
atomic_wide_t data_rcvd;
atomic_wide_t data_sent_reported;
atomic_wide_t data_rcvd_reported;
float channel_eol_point; /* End of life time, since epoch */
enum type {
CONN_OUTGOING,
Expand Down Expand Up @@ -589,19 +591,7 @@ static void expire_channel_lives(TK_P_ tk_timer UNUSED *w, int UNUSED revents) {
}

static void stats_timer_cb(TK_P_ tk_timer UNUSED *w, int UNUSED revents) {
struct loop_arguments *largs = tk_userdata(TK_A);

/*
* Move the connections' stats.data.ptr out into the atomically managed
* thread-specific aggregate counters.
*/
struct connection *conn;
TAILQ_FOREACH(conn, &largs->open_conns, hook) {
atomic_add(&largs->worker_data_sent, conn->data_sent);
atomic_add(&largs->worker_data_rcvd, conn->data_rcvd);
conn->data_sent = 0;
conn->data_rcvd = 0;
}
connections_flush_stats(TK_A);
}

static void *single_engine_loop_thread(void *argp) {
Expand Down Expand Up @@ -1155,6 +1145,7 @@ static enum {
ev_timer_init(&conn->timer, conn_timer_cb, delay, 0.0);
ev_timer_start(TK_A_ &conn->timer);
#endif
*suggested_move_size = 0;
return LB_GO_SLEEP;
} else {
/* smallest_blk allowed
Expand Down Expand Up @@ -1388,7 +1379,7 @@ static void latency_record_incoming_ts(TK_P_ struct connection *conn, char *buf,
* Compute the largest amount of data we can send to the channel
* using a single write() call.
*/
static void largest_contiguous_chunk(struct loop_arguments *largs, struct connection *conn, const void **position, size_t *available_length) {
static void largest_contiguous_chunk(struct loop_arguments *largs, struct connection *conn, const void **position, size_t *available_header, size_t *available_body) {
off_t *current_offset = &conn->write_offset;
size_t accessible_size = largs->params.data.total_size;
size_t available = accessible_size - *current_offset;
Expand All @@ -1404,17 +1395,26 @@ static void largest_contiguous_chunk(struct loop_arguments *largs, struct connec
accessible_size = largs->params.data.ws_hdr_size;
size_t available = accessible_size - *current_offset;
*position = largs->params.data.ptr + *current_offset;
*available_length = available;
*available_header = available;
*available_body = 0;
return;
}

if(conn->data_sent < largs->params.data.once_size) {
/* Send header... once per connection lifetime */
*available_header = largs->params.data.once_size - conn->data_sent;
assert(available);
} else {
*available_header = 0; /* Sending body */
}

if(available) {
*position = largs->params.data.ptr + *current_offset;
*available_length = available;
*available_body = available - *available_header;
} else {
size_t off = largs->params.data.once_size;
*position = largs->params.data.ptr + off;
*available_length = accessible_size - off;
*available_body = accessible_size - off;
*current_offset = off;
}
}
Expand Down Expand Up @@ -1499,28 +1499,31 @@ static void connection_cb(TK_P_ tk_io *w, int revents) {

if(revents & TK_WRITE) {
const void *position;
size_t available_length;
size_t available_header, available_body;
int record_moved = 0;

largest_contiguous_chunk(largs, conn, &position, &available_length);
if(!available_length) {
largest_contiguous_chunk(largs, conn, &position, &available_header, &available_body);
if(!(available_header + available_body)) {
/* Only the header was sent. Now, silence. */
assert(largs->params.data.total_size == largs->params.data.once_size
|| largs->params.websocket_enable);
update_io_interest(TK_A_ conn, TK_READ); /* no write interest */
return;
}

/* Adjust (available_length) to avoid sending too much stuff. */
switch(limit_channel_bandwidth(TK_A_ conn, &available_length)) {
/* Adjust (available_body) to avoid sending too much stuff. */
switch(limit_channel_bandwidth(TK_A_ conn, &available_body)) {
case LB_UNLIMITED: record_moved = 0; break;
case LB_PROCEED: record_moved = 1; break;
case LB_GO_SLEEP:
if(available_header)
break;
update_io_interest(TK_A_ conn, TK_READ);
return;
}

ssize_t wrote = write(tk_fd(w), position, available_length);
ssize_t wrote = write(tk_fd(w), position,
available_header + available_body);
if(wrote == -1) {
char buf[INET6_ADDRSTRLEN+64];
switch(errno) {
Expand All @@ -1541,7 +1544,12 @@ static void connection_cb(TK_P_ tk_io *w, int revents) {
pacefier_moved(&conn->bw_pace,
largs->params.channel_bandwidth_Bps,
wrote, tk_now(TK_A));
latency_record_outgoing_ts(TK_A_ conn, &largs->params.data, position, wrote);
wrote -= available_header;
if(wrote > 0) {
/* Record latencies for the body only, not headers */
latency_record_outgoing_ts(TK_A_ conn,
&largs->params.data, position, wrote);
}
}
}

Expand All @@ -1560,15 +1568,32 @@ static void close_all_connections(TK_P_ enum connection_close_reason reason) {
}
}

/*
* Move the connections' stats.data.ptr out into the atomically managed
* thread-specific aggregate counters.
*/
static void connections_flush_stats(TK_P) {
struct loop_arguments *largs = tk_userdata(TK_A);
struct connection *conn;
struct connection *tmpconn;
TAILQ_FOREACH_SAFE(conn, &largs->open_conns, hook, tmpconn) {
TAILQ_FOREACH(conn, &largs->open_conns, hook) {
connection_flush_stats(TK_A_ conn);
}
}

/*
* Add whatever data transfer counters we accumulated in a connection
* back to the worker-wide tally.
*/
static void connection_flush_stats(TK_P_ struct connection *conn) {
struct loop_arguments *largs = tk_userdata(TK_A);
size_t sent_delta = conn->data_sent - conn->data_sent_reported;
size_t rcvd_delta = conn->data_rcvd - conn->data_rcvd_reported;
conn->data_sent_reported = conn->data_sent;
conn->data_rcvd_reported = conn->data_rcvd;
atomic_add(&largs->worker_data_sent, sent_delta);
atomic_add(&largs->worker_data_rcvd, rcvd_delta);
}

static void free_connection_by_handle(tk_io *w) {
struct connection *conn = (struct connection *)((char *)w - offsetof(struct connection, watcher));
free(conn);
Expand Down Expand Up @@ -1662,18 +1687,6 @@ static void close_connection(TK_P_ struct connection *conn, enum connection_clos
tk_close(&conn->watcher, free_connection_by_handle);
}

/*
* Add whatever data transfer counters we accumulated in a connection
* back to the worker-wide tally.
*/
static void connection_flush_stats(TK_P_ struct connection *conn) {
struct loop_arguments *largs = tk_userdata(TK_A);
atomic_add(&largs->worker_data_sent, conn->data_sent);
atomic_add(&largs->worker_data_rcvd, conn->data_rcvd);
conn->data_sent = 0;
conn->data_rcvd = 0;
}

/*
* Determine the amount of parallelism available in this system.
*/
Expand Down

0 comments on commit 9ee4e19

Please sign in to comment.