diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 44088984..2a403f22 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -9,7 +9,7 @@ if (NOT TARGET tlsuv) else () FetchContent_Declare(tlsuv GIT_REPOSITORY https://github.com/openziti/tlsuv.git - GIT_TAG v0.32.0 + GIT_TAG v0.32.1 ) FetchContent_MakeAvailable(tlsuv) endif (tlsuv_DIR) diff --git a/includes/ziti/ziti.h b/includes/ziti/ziti.h index 74223cc0..ec4ecd1b 100644 --- a/includes/ziti/ziti.h +++ b/includes/ziti/ziti.h @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -643,9 +643,10 @@ extern void ziti_conn_set_data(ziti_connection conn, void *data); * * @param conn * @param cb + * @return ZITI_OK or error code */ ZITI_FUNC -extern void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb); +extern int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb); /** * @brief Get the identity of the client that initiated the #ziti_connection. diff --git a/library/conn_bridge.c b/library/conn_bridge.c index 54baf63a..ea590e3a 100644 --- a/library/conn_bridge.c +++ b/library/conn_bridge.c @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023. NetFoundry Inc. +// Copyright (c) 2022-2024. NetFoundry Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -57,7 +57,7 @@ static int fmt_addr(struct sockaddr_storage *ss, char *host, size_t host_len, in extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close) { - if (handle == NULL) return UV_EINVAL; + if (handle == NULL || conn == NULL) return UV_EINVAL; if ( !(handle->type == UV_TCP || handle->type == UV_NAMED_PIPE || handle->type == UV_TTY || handle->type == UV_UDP )) { @@ -74,6 +74,12 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_ } } + int rc; + if ((rc = ziti_conn_set_data_cb(conn, on_ziti_data)) != ZITI_OK) { + ZITI_LOG(ERROR, "failed to bridge ziti connection: %s", ziti_errorstr(rc)); + return UV_ECONNRESET; + } + NEWP(br, struct ziti_bridge_s); br->conn = conn; br->input = handle; @@ -86,14 +92,14 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_ ziti_conn_set_data(conn, br); conn->bridged = true; - ziti_conn_set_data_cb(conn, on_ziti_data); - int rc = (br->input->type == UV_UDP) ? - uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) : - uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input); + rc = (br->input->type == UV_UDP) ? + uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) : + uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input); if (rc != 0) { BR_LOG(WARN, "failed to start reading handle: %d/%s", rc, uv_strerror(rc)); close_bridge(br); + return UV_ECONNABORTED; } else { BR_LOG(DEBUG, "connected"); } @@ -125,6 +131,8 @@ static void on_pipes_close(uv_handle_t *h) { } extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd_t output, void (*close_cb)(void *ctx), void *ctx) { + if (conn == NULL) return UV_EINVAL; + uv_loop_t *l = ziti_conn_context(conn)->loop; NEWP(fdbr, struct fd_bridge_s); @@ -133,8 +141,8 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd fdbr->close_cb = close_cb; fdbr->ctx = ctx; - uv_handle_t *sock = NULL; if (input == output) { + uv_handle_t *sock = NULL; int type; socklen_t len = sizeof(type); if (getsockopt(input, SOL_SOCKET, SO_TYPE, &type, &len) == 0) { diff --git a/library/connect.c b/library/connect.c index e6285f79..1444e970 100644 --- a/library/connect.c +++ b/library/connect.c @@ -102,6 +102,25 @@ const char *ziti_conn_state(ziti_connection conn) { return conn ? conn_state_str[conn->state] : ""; } +int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) { + if (conn == NULL) return ZITI_INVALID_STATE; + + if (conn->state == Disconnected || conn->state == Closed) { + return ZITI_CONN_CLOSED; + } + + // app already received EOF + if (conn->fin_recv == 2) { + return ZITI_EOF; + } + + conn->data_cb = cb; + if (!TAILQ_EMPTY(&conn->in_q) || buffer_available(conn->inbound) > 0) { + flush_connection(conn); + } + return ZITI_OK; +} + static void conn_set_state(struct ziti_conn *conn, enum conn_state state) { CONN_LOG(VERBOSE, "transitioning %s => %s", conn_state_str[conn->state], conn_state_str[state]); conn->state = state; @@ -869,9 +888,14 @@ static bool flush_to_client(ziti_connection conn) { pool_return_obj(m); } + if (conn->data_cb == NULL) { + CONN_LOG(DEBUG, "no data_cb: can't flush, %zu bytes available", buffer_available(conn->inbound)); + return false; + } + CONN_LOG(VERBOSE, "%zu bytes available", buffer_available(conn->inbound)); int flushes = 128; - while (buffer_available(conn->inbound) > 0 && (flushes--) > 0) { + while (conn->data_cb && buffer_available(conn->inbound) > 0 && (flushes--) > 0) { uint8_t *chunk; ssize_t chunk_len = buffer_get_next(conn->inbound, 16 * 1024, &chunk); ssize_t consumed = conn->data_cb(conn, chunk, chunk_len); @@ -880,6 +904,7 @@ static bool flush_to_client(ziti_connection conn) { if (consumed < 0) { CONN_LOG(WARN, "client indicated error[%zd] accepting data (%zd bytes buffered)", consumed, buffer_available(conn->inbound)); + break; } else if (consumed < chunk_len) { buffer_push_back(conn->inbound, (chunk_len - consumed)); CONN_LOG(VERBOSE, "client stalled: %zd bytes buffered", buffer_available(conn->inbound)); @@ -889,11 +914,11 @@ static bool flush_to_client(ziti_connection conn) { if (buffer_available(conn->inbound) > 0) { CONN_LOG(VERBOSE, "%zu bytes still available", buffer_available(conn->inbound)); - - return true; + // no need to schedule flush if client closed or paused receiving + return conn->data_cb != NULL; } - if (conn->fin_recv == 1) { // if fin was received and all data is flushed, signal EOF + if (conn->fin_recv == 1 && conn->data_cb) { // if fin was received and all data is flushed, signal EOF conn->fin_recv = 2; conn->data_cb(conn, NULL, ZITI_EOF); } @@ -1278,6 +1303,8 @@ int ziti_close(ziti_connection conn, ziti_close_cb close_cb) { if (conn->type == Server) { return ziti_close_server(conn); } + + conn->data_cb = NULL; return ziti_disconnect(conn); } diff --git a/library/ziti.c b/library/ziti.c index 0b90224d..378e1c91 100644 --- a/library/ziti.c +++ b/library/ziti.c @@ -825,12 +825,6 @@ void ziti_conn_set_data(ziti_connection conn, void *data) { } } -void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) { - if (conn) { - conn->data_cb = cb; - } -} - const char *ziti_conn_source_identity(ziti_connection conn) { return conn != NULL ? conn->source_identity : NULL; } diff --git a/programs/ziti-prox-c/proxy.c b/programs/ziti-prox-c/proxy.c index 32ac9a87..1088269d 100644 --- a/programs/ziti-prox-c/proxy.c +++ b/programs/ziti-prox-c/proxy.c @@ -161,7 +161,8 @@ static void process_stop(uv_loop_t *loop, struct proxy_app_ctx *app_ctx) { uv_unref((uv_handle_t *) &shutdown_timer); // try to cleanup - ziti_shutdown(app_ctx->ziti); + if (app_ctx->ziti) + ziti_shutdown(app_ctx->ziti); ZITI_LOG(INFO, "exiting"); }