Skip to content

Commit

Permalink
Merge pull request #732 from openziti/fix-crash-with-no-datacb
Browse files Browse the repository at this point in the history
better handle ziti_conn.data_cb
  • Loading branch information
ekoby authored Sep 29, 2024
2 parents 92c7d63 + 6ed84d8 commit cb0bc3a
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 24 deletions.
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ else ()

FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.31.4
GIT_TAG v0.32.1
)
FetchContent_MakeAvailable(tlsuv)

Expand Down
5 changes: 3 additions & 2 deletions includes/ziti/ziti.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -644,9 +644,10 @@ extern void ziti_conn_set_data(ziti_connection conn, void *data);
*
* @param conn
* @param cb
* @return ZITI_OK or error code
*/
ZITI_FUNC
extern void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb);
extern int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb);

/**
* @brief Get the identity of the client that initiated the #ziti_connection.
Expand Down
22 changes: 15 additions & 7 deletions library/conn_bridge.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@ static int fmt_addr(struct sockaddr_storage *ss, char *host, size_t host_len, in


extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close) {
if (handle == NULL) return UV_EINVAL;
if (handle == NULL || conn == NULL) return UV_EINVAL;

if ( !(handle->type == UV_TCP || handle->type == UV_NAMED_PIPE ||
handle->type == UV_TTY || handle->type == UV_UDP )) {
Expand All @@ -74,6 +74,12 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_
}
}

int rc;
if ((rc = ziti_conn_set_data_cb(conn, on_ziti_data)) != ZITI_OK) {
ZITI_LOG(ERROR, "failed to bridge ziti connection: %s", ziti_errorstr(rc));
return UV_ECONNRESET;
}

NEWP(br, struct ziti_bridge_s);
br->conn = conn;
br->input = handle;
Expand All @@ -86,14 +92,14 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_
ziti_conn_set_data(conn, br);
conn->bridged = true;

ziti_conn_set_data_cb(conn, on_ziti_data);
int rc = (br->input->type == UV_UDP) ?
uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) :
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);
rc = (br->input->type == UV_UDP) ?
uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) :
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);

if (rc != 0) {
BR_LOG(WARN, "failed to start reading handle: %d/%s", rc, uv_strerror(rc));
close_bridge(br);
return UV_ECONNABORTED;
} else {
BR_LOG(DEBUG, "connected");
}
Expand Down Expand Up @@ -125,6 +131,8 @@ static void on_pipes_close(uv_handle_t *h) {
}

extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd_t output, void (*close_cb)(void *ctx), void *ctx) {
if (conn == NULL) return UV_EINVAL;

uv_loop_t *l = ziti_conn_context(conn)->loop;

NEWP(fdbr, struct fd_bridge_s);
Expand All @@ -133,8 +141,8 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd
fdbr->close_cb = close_cb;
fdbr->ctx = ctx;

uv_handle_t *sock = NULL;
if (input == output) {
uv_handle_t *sock = NULL;
int type;
socklen_t len = sizeof(type);
if (getsockopt(input, SOL_SOCKET, SO_TYPE, &type, &len) == 0) {
Expand Down
37 changes: 32 additions & 5 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ const char *ziti_conn_state(ziti_connection conn) {
return conn ? conn_state_str[conn->state] : "<NULL>";
}

int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) {
if (conn == NULL) return ZITI_INVALID_STATE;

if (conn->state == Disconnected || conn->state == Closed) {
return ZITI_CONN_CLOSED;
}

// app already received EOF
if (conn->fin_recv == 2) {
return ZITI_EOF;
}

conn->data_cb = cb;
if (!TAILQ_EMPTY(&conn->in_q) || buffer_available(conn->inbound) > 0) {
flush_connection(conn);
}
return ZITI_OK;
}

static void conn_set_state(struct ziti_conn *conn, enum conn_state state) {
CONN_LOG(VERBOSE, "transitioning %s => %s", conn_state_str[conn->state], conn_state_str[state]);
conn->state = state;
Expand Down Expand Up @@ -808,9 +827,14 @@ static bool flush_to_client(ziti_connection conn) {
pool_return_obj(m);
}

if (conn->data_cb == NULL) {
CONN_LOG(DEBUG, "no data_cb: can't flush, %zu bytes available", buffer_available(conn->inbound));
return false;
}

CONN_LOG(VERBOSE, "%zu bytes available", buffer_available(conn->inbound));
int flushes = 128;
while (buffer_available(conn->inbound) > 0 && (flushes--) > 0) {
while (conn->data_cb && buffer_available(conn->inbound) > 0 && (flushes--) > 0) {
uint8_t *chunk;
ssize_t chunk_len = buffer_get_next(conn->inbound, 16 * 1024, &chunk);
ssize_t consumed = conn->data_cb(conn, chunk, chunk_len);
Expand All @@ -819,6 +843,7 @@ static bool flush_to_client(ziti_connection conn) {
if (consumed < 0) {
CONN_LOG(WARN, "client indicated error[%zd] accepting data (%zd bytes buffered)",
consumed, buffer_available(conn->inbound));
break;
} else if (consumed < chunk_len) {
buffer_push_back(conn->inbound, (chunk_len - consumed));
CONN_LOG(VERBOSE, "client stalled: %zd bytes buffered", buffer_available(conn->inbound));
Expand All @@ -828,11 +853,11 @@ static bool flush_to_client(ziti_connection conn) {

if (buffer_available(conn->inbound) > 0) {
CONN_LOG(VERBOSE, "%zu bytes still available", buffer_available(conn->inbound));

return true;
// no need to schedule flush if client closed or paused receiving
return conn->data_cb != NULL;
}

if (conn->fin_recv == 1) { // if fin was received and all data is flushed, signal EOF
if (conn->fin_recv == 1 && conn->data_cb) { // if fin was received and all data is flushed, signal EOF
conn->fin_recv = 2;
conn->data_cb(conn, NULL, ZITI_EOF);
}
Expand Down Expand Up @@ -913,7 +938,7 @@ void conn_inbound_data_msg(ziti_connection conn, message *msg) {

int32_t flags;
if (message_get_int32_header(msg, FlagsHeader, &flags) && (flags & EDGE_FIN)) {
conn->fin_recv = true;
conn->fin_recv = 1;
}
}

Expand Down Expand Up @@ -1217,6 +1242,8 @@ int ziti_close(ziti_connection conn, ziti_close_cb close_cb) {
if (conn->type == Server) {
return ziti_close_server(conn);
}

conn->data_cb = NULL;
return ziti_disconnect(conn);
}

Expand Down
6 changes: 0 additions & 6 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,6 @@ void ziti_conn_set_data(ziti_connection conn, void *data) {
}
}

void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) {
if (conn) {
conn->data_cb = cb;
}
}

const char *ziti_conn_source_identity(ziti_connection conn) {
return conn != NULL ? conn->source_identity : NULL;
}
Expand Down
5 changes: 2 additions & 3 deletions programs/ziti-prox-c/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ static void free_listener(struct listener *l) {

static void process_stop(uv_loop_t *loop, struct proxy_app_ctx *app_ctx) {
ZITI_LOG(INFO, "stopping");
PREPF(uv, uv_strerror);

// shutdown listeners
MODEL_MAP_FOR(it, app_ctx->listeners) {
Expand All @@ -151,9 +150,9 @@ static void process_stop(uv_loop_t *loop, struct proxy_app_ctx *app_ctx) {
uv_unref((uv_handle_t *) &shutdown_timer);

// try to cleanup
ziti_shutdown(app_ctx->ziti);
if (app_ctx->ziti)
ziti_shutdown(app_ctx->ziti);

CATCH(uv) {}
ZITI_LOG(INFO, "exiting");
}

Expand Down

0 comments on commit cb0bc3a

Please sign in to comment.