Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 1.x #733

Merged
merged 6 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if (NOT TARGET tlsuv)
else ()
FetchContent_Declare(tlsuv
GIT_REPOSITORY https://github.com/openziti/tlsuv.git
GIT_TAG v0.32.0
GIT_TAG v0.32.1
)
FetchContent_MakeAvailable(tlsuv)
endif (tlsuv_DIR)
Expand Down
5 changes: 3 additions & 2 deletions includes/ziti/ziti.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -643,9 +643,10 @@ extern void ziti_conn_set_data(ziti_connection conn, void *data);
*
* @param conn
* @param cb
* @return ZITI_OK or error code
*/
ZITI_FUNC
extern void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb);
extern int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb);

/**
* @brief Get the identity of the client that initiated the #ziti_connection.
Expand Down
22 changes: 15 additions & 7 deletions library/conn_bridge.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2022-2023. NetFoundry Inc.
// Copyright (c) 2022-2024. NetFoundry Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@ static int fmt_addr(struct sockaddr_storage *ss, char *host, size_t host_len, in


extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_cb on_close) {
if (handle == NULL) return UV_EINVAL;
if (handle == NULL || conn == NULL) return UV_EINVAL;

if ( !(handle->type == UV_TCP || handle->type == UV_NAMED_PIPE ||
handle->type == UV_TTY || handle->type == UV_UDP )) {
Expand All @@ -74,6 +74,12 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_
}
}

int rc;
if ((rc = ziti_conn_set_data_cb(conn, on_ziti_data)) != ZITI_OK) {
ZITI_LOG(ERROR, "failed to bridge ziti connection: %s", ziti_errorstr(rc));
return UV_ECONNRESET;
}

NEWP(br, struct ziti_bridge_s);
br->conn = conn;
br->input = handle;
Expand All @@ -86,14 +92,14 @@ extern int ziti_conn_bridge(ziti_connection conn, uv_handle_t *handle, uv_close_
ziti_conn_set_data(conn, br);
conn->bridged = true;

ziti_conn_set_data_cb(conn, on_ziti_data);
int rc = (br->input->type == UV_UDP) ?
uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) :
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);
rc = (br->input->type == UV_UDP) ?
uv_udp_recv_start((uv_udp_t *) br->input, bridge_alloc, on_udp_input) :
uv_read_start((uv_stream_t *) br->input, bridge_alloc, on_input);

if (rc != 0) {
BR_LOG(WARN, "failed to start reading handle: %d/%s", rc, uv_strerror(rc));
close_bridge(br);
return UV_ECONNABORTED;
} else {
BR_LOG(DEBUG, "connected");
}
Expand Down Expand Up @@ -125,6 +131,8 @@ static void on_pipes_close(uv_handle_t *h) {
}

extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd_t output, void (*close_cb)(void *ctx), void *ctx) {
if (conn == NULL) return UV_EINVAL;

uv_loop_t *l = ziti_conn_context(conn)->loop;

NEWP(fdbr, struct fd_bridge_s);
Expand All @@ -133,8 +141,8 @@ extern int ziti_conn_bridge_fds(ziti_connection conn, uv_os_fd_t input, uv_os_fd
fdbr->close_cb = close_cb;
fdbr->ctx = ctx;

uv_handle_t *sock = NULL;
if (input == output) {
uv_handle_t *sock = NULL;
int type;
socklen_t len = sizeof(type);
if (getsockopt(input, SOL_SOCKET, SO_TYPE, &type, &len) == 0) {
Expand Down
35 changes: 31 additions & 4 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ const char *ziti_conn_state(ziti_connection conn) {
return conn ? conn_state_str[conn->state] : "<NULL>";
}

int ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) {
if (conn == NULL) return ZITI_INVALID_STATE;

if (conn->state == Disconnected || conn->state == Closed) {
return ZITI_CONN_CLOSED;
}

// app already received EOF
if (conn->fin_recv == 2) {
return ZITI_EOF;
}

conn->data_cb = cb;
if (!TAILQ_EMPTY(&conn->in_q) || buffer_available(conn->inbound) > 0) {
flush_connection(conn);
}
return ZITI_OK;
}

static void conn_set_state(struct ziti_conn *conn, enum conn_state state) {
CONN_LOG(VERBOSE, "transitioning %s => %s", conn_state_str[conn->state], conn_state_str[state]);
conn->state = state;
Expand Down Expand Up @@ -869,9 +888,14 @@ static bool flush_to_client(ziti_connection conn) {
pool_return_obj(m);
}

if (conn->data_cb == NULL) {
CONN_LOG(DEBUG, "no data_cb: can't flush, %zu bytes available", buffer_available(conn->inbound));
return false;
}

CONN_LOG(VERBOSE, "%zu bytes available", buffer_available(conn->inbound));
int flushes = 128;
while (buffer_available(conn->inbound) > 0 && (flushes--) > 0) {
while (conn->data_cb && buffer_available(conn->inbound) > 0 && (flushes--) > 0) {
uint8_t *chunk;
ssize_t chunk_len = buffer_get_next(conn->inbound, 16 * 1024, &chunk);
ssize_t consumed = conn->data_cb(conn, chunk, chunk_len);
Expand All @@ -880,6 +904,7 @@ static bool flush_to_client(ziti_connection conn) {
if (consumed < 0) {
CONN_LOG(WARN, "client indicated error[%zd] accepting data (%zd bytes buffered)",
consumed, buffer_available(conn->inbound));
break;
} else if (consumed < chunk_len) {
buffer_push_back(conn->inbound, (chunk_len - consumed));
CONN_LOG(VERBOSE, "client stalled: %zd bytes buffered", buffer_available(conn->inbound));
Expand All @@ -889,11 +914,11 @@ static bool flush_to_client(ziti_connection conn) {

if (buffer_available(conn->inbound) > 0) {
CONN_LOG(VERBOSE, "%zu bytes still available", buffer_available(conn->inbound));

return true;
// no need to schedule flush if client closed or paused receiving
return conn->data_cb != NULL;
}

if (conn->fin_recv == 1) { // if fin was received and all data is flushed, signal EOF
if (conn->fin_recv == 1 && conn->data_cb) { // if fin was received and all data is flushed, signal EOF
conn->fin_recv = 2;
conn->data_cb(conn, NULL, ZITI_EOF);
}
Expand Down Expand Up @@ -1278,6 +1303,8 @@ int ziti_close(ziti_connection conn, ziti_close_cb close_cb) {
if (conn->type == Server) {
return ziti_close_server(conn);
}

conn->data_cb = NULL;
return ziti_disconnect(conn);
}

Expand Down
6 changes: 0 additions & 6 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,12 +825,6 @@ void ziti_conn_set_data(ziti_connection conn, void *data) {
}
}

void ziti_conn_set_data_cb(ziti_connection conn, ziti_data_cb cb) {
if (conn) {
conn->data_cb = cb;
}
}

const char *ziti_conn_source_identity(ziti_connection conn) {
return conn != NULL ? conn->source_identity : NULL;
}
Expand Down
3 changes: 2 additions & 1 deletion programs/ziti-prox-c/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ static void process_stop(uv_loop_t *loop, struct proxy_app_ctx *app_ctx) {
uv_unref((uv_handle_t *) &shutdown_timer);

// try to cleanup
ziti_shutdown(app_ctx->ziti);
if (app_ctx->ziti)
ziti_shutdown(app_ctx->ziti);

ZITI_LOG(INFO, "exiting");
}
Expand Down
Loading