Skip to content

Commit

Permalink
Pass notifies from serve processes to xfrd
Browse files Browse the repository at this point in the history
Directly without main or backup-main proxying for it.
  • Loading branch information
wtoorop committed Oct 17, 2024
1 parent 740902a commit 7090635
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 17 deletions.
44 changes: 44 additions & 0 deletions ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,47 @@ xfrd_handle_ipc_read(struct event* handler, xfrd_state_type* xfrd)
buffer_clear(xfrd->ipc_conn->packet);
}
}

void
xfrd_handle_notify(int fd, short event, void* arg)
{
xfrd_state_type* xfrd = (xfrd_state_type*)arg;
ssize_t r;
uint32_t acl_num;
int32_t acl_xfr;

if (!(event & EV_READ))
return;

/* xfrd->notify_message is really local to the scope of this function,
* but allocated beforehand anyway to prevent claiming more than
* QIOBUFSZ on the stack. There is only a single xfrd event loop,
* therefore it is safe to use this semi global variable.
*/
buffer_clear(xfrd->notify_message);
r = recv(fd, buffer_current( xfrd->notify_message)
, buffer_capacity(xfrd->notify_message), MSG_DONTWAIT);
if(r == -1) {
if(errno != EAGAIN && errno != EINTR && errno != EMSGSIZE) {
log_msg( LOG_ERR
, "xfrd_handle_notify receive failed: %s"
, strerror(errno));
}
return;
} else if(r == 0) {
log_msg(LOG_ERR, "xfrd_handle_notify remote closed connection");
return;
} else if(r < (ssize_t)(sizeof(acl_xfr) + sizeof(acl_num))) {
log_msg(LOG_ERR, "xfrd_handle_notify invalid message size");
return;
}
/* acl_num and acl_xfr are appended to the NOTIFY message */
acl_num = buffer_read_u32_at(xfrd->notify_message,
r - sizeof(acl_xfr) - sizeof(acl_num));
acl_xfr = (int32_t)buffer_read_u32_at(xfrd->notify_message,
r - sizeof(acl_xfr));
buffer_skip(xfrd->notify_message, r - sizeof(acl_xfr) - sizeof(acl_num));
buffer_flip(xfrd->notify_message);
xfrd_handle_passed_packet(xfrd->notify_message, acl_num, acl_xfr);
}

3 changes: 3 additions & 0 deletions ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ void child_handle_parent_command(int fd, short event, void* arg);
*/
void xfrd_handle_ipc(int fd, short event, void* arg);

/* receive incoming notifies received by and from the serve processes */
void xfrd_handle_notify(int fd, short event, void* arg);

/* check if all children have exited in an orderly fashion and set mode */
void parent_check_all_children_exited(struct nsd* nsd);

Expand Down
9 changes: 9 additions & 0 deletions nsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ struct nsd
* simultaneous with new serve childs. */
int *dt_collector_fd_swap;
#endif /* USE_DNSTAP */
/* the pipes from the serve processes to xfrd, for passing through
* NOTIFY messages, arrays of size child_count * 2.
* Kept open for (re-)forks. */
int *serve2xfrd_fd_send, *serve2xfrd_fd_recv;
/* the pipes from the serve processes to the xfrd. Initially
* these point halfway into serve2xfrd_fd_send, but during reload
* the pointer is swapped with serve2xfrd_fd_send so that only one
* serve child will write to the same fd simultaneously. */
int *serve2xfrd_fd_swap;
/* ratelimit for errors, time value */
time_t err_limit_time;
/* ratelimit for errors, packet count */
Expand Down
45 changes: 29 additions & 16 deletions query.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,7 @@ answer_notify(struct nsd* nsd, struct query *query)
if((acl_num = acl_check_incoming(zone_opt->pattern->allow_notify, query,
&why)) != -1)
{
sig_atomic_t mode = NSD_PASS_TO_XFRD;
int s = nsd->this_child->parent_fd;
ssize_t r;
uint16_t sz;
uint32_t acl_send = htonl(acl_num);
uint32_t acl_xfr;
Expand All @@ -488,30 +487,44 @@ answer_notify(struct nsd* nsd, struct query *query)
sz = buffer_limit(query->packet);
if(buffer_limit(query->packet) > MAX_PACKET_SIZE)
return query_error(query, NSD_RC_SERVFAIL);
/* forward to xfrd for processing
Note. Blocking IPC I/O, but acl is OK. */
sz = htons(sz);
if(!write_socket(s, &mode, sizeof(mode)) ||
!write_socket(s, &sz, sizeof(sz)) ||
!write_socket(s, buffer_begin(query->packet),
buffer_limit(query->packet)) ||
!write_socket(s, &acl_send, sizeof(acl_send)) ||
!write_socket(s, &acl_xfr, sizeof(acl_xfr))) {
log_msg(LOG_ERR, "error in IPC notify server2main, %s",
/* Temporary append acl_send and acl_xfr after the NOTIFY
* message in the buffer, for sending along to the xfrd */
assert(buffer_capacity(query->packet) >
sz + sizeof(acl_send) + sizeof(acl_xfr));
buffer_set_limit( query->packet
, sz + sizeof(acl_send) + sizeof(acl_xfr));
buffer_write_u32_at(query->packet, sz, acl_send);
buffer_write_u32_at(query->packet, sz + sizeof(acl_send)
, acl_xfr);
r = send( nsd->serve2xfrd_fd_send[nsd->this_child->child_num]
, buffer_begin(query->packet)
, buffer_limit(query->packet)
, MSG_DONTWAIT | MSG_NOSIGNAL
);
/* Restore query->packet buffer by removing the earlier
* appended acl_send and acl_xfr again*/
buffer_set_limit(query->packet, sz);
if(r < 0) {
log_msg(LOG_ERR, "error in IPC notify serve2xfrd, %s",
strerror(errno));
return query_error(query, NSD_RC_SERVFAIL);
} else if(r == 0) {
log_msg(LOG_ERR, "error in IPC notify serve2xfrd, %s",
"xfrd closed the channel");
return query_error(query, NSD_RC_SERVFAIL);
}
if(verbosity >= 1) {
uint32_t serial = 0;
char address[128];
addr2str(&query->client_addr, address, sizeof(address));
if(packet_find_notify_serial(query->packet, &serial))
VERBOSITY(1, (LOG_INFO, "notify for %s from %s serial %u",
VERBOSITY(1, (LOG_INFO, "notify for %s from %s serial %u size %u",
dname_to_string(query->qname, NULL), address,
(unsigned)serial));
(unsigned)serial, (unsigned)sz));
else
VERBOSITY(1, (LOG_INFO, "notify for %s from %s",
dname_to_string(query->qname, NULL), address));
VERBOSITY(1, (LOG_INFO, "notify for %s from %s size %u",
dname_to_string(query->qname, NULL), address,
(unsigned)sz));
}

/* create notify reply - keep same query contents */
Expand Down
65 changes: 64 additions & 1 deletion server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,7 @@ void
server_prepare_xfrd(struct nsd* nsd)
{
char tmpfile[256];
size_t i;
/* create task mmaps */
nsd->mytask = 0;
snprintf(tmpfile, sizeof(tmpfile), "%snsd-xfr-%d/nsd.%u.task.0",
Expand Down Expand Up @@ -1683,6 +1684,47 @@ server_prepare_xfrd(struct nsd* nsd)
nsd;
((struct ipc_handler_conn_data*)nsd->xfrd_listener->user_data)->conn =
xfrd_tcp_create(nsd->region, QIOBUFSZ);
/* setup sockets to pass NOTIFY messages from the serve processes */
nsd->serve2xfrd_fd_send = region_alloc_array(
nsd->region, 2 * nsd->child_count, sizeof(int));
nsd->serve2xfrd_fd_recv= region_alloc_array(
nsd->region, 2 * nsd->child_count, sizeof(int));
for(i=0; i < 2 * nsd->child_count; i++) {
int sv[2];
int bufsz = QIOBUFSZ;
sv[0] = -1; /* For receiving by parent (xfrd) */
sv[1] = -1; /* For sending by child (server childs) */
if(socketpair(AF_UNIX, SOCK_DGRAM
#ifdef SOCK_NONBLOCK
| SOCK_NONBLOCK
#endif
, 0, sv) < 0) {
log_msg(LOG_ERR, "fatal error: cannot create NOTIFY "
"communication channel: %s", strerror(errno));
exit(1);
}
#ifndef SOCK_NONBLOCK
if (fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
log_msg(LOG_ERR, "serve2xfrd receive fd fcntl "
"failed: %s", strerror(errno));
}
if (fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
log_msg(LOG_ERR, "serve2xfrd send fd fcntl "
"failed: %s", strerror(errno));
}
#endif
if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) {
log_msg(LOG_ERR, "setting serve2xfrd "
"receive buffer size failed: %s", strerror(errno));
}
if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) {
log_msg(LOG_ERR, "setting serve2xfrd "
"send buffer size failed: %s", strerror(errno));
}
nsd->serve2xfrd_fd_recv[i] = sv[0];
nsd->serve2xfrd_fd_send[i] = sv[1];
}
nsd->serve2xfrd_fd_swap = nsd->serve2xfrd_fd_send + nsd->child_count;
}


Expand All @@ -1692,6 +1734,7 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
pid_t pid;
int sockets[2] = {0,0};
struct ipc_handler_conn_data *data;
size_t i;

if(nsd->xfrd_listener->fd != -1)
close(nsd->xfrd_listener->fd);
Expand Down Expand Up @@ -1728,6 +1771,14 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
* restarted, the reload is using nsd->mytask */
nsd->mytask = 1 - nsd->mytask;

/* close the send site of the serve2xfrd fds */
assert(nsd->serve2xfrd_fd_send < nsd->serve2xfrd_fd_swap);
for(i = 0; i < 2 * nsd->child_count; i++) {
if(nsd->serve2xfrd_fd_send[i] != -1) {
close(nsd->serve2xfrd_fd_send[i]);
nsd->serve2xfrd_fd_send[i] = -1;
}
}
#ifdef HAVE_SETPROCTITLE
setproctitle("xfrd");
#endif
Expand All @@ -1750,6 +1801,13 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
log_msg(LOG_ERR, "cannot fcntl pipe: %s", strerror(errno));
}
nsd->xfrd_listener->fd = sockets[0];
/* close the receive site of the serve2xfrd fds */
for(i = 0; i < 2 * nsd->child_count; i++) {
if(nsd->serve2xfrd_fd_recv[i] != -1) {
close(nsd->serve2xfrd_fd_recv[i]);
nsd->serve2xfrd_fd_recv[i] = -1;
}
}
#ifdef HAVE_SETPROCTITLE
setproctitle("main");
#endif
Expand Down Expand Up @@ -2404,6 +2462,9 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,
struct quit_sync_event_data cb_data;
struct event signal_event, cmd_event;
struct timeval reload_sync_timeout;
/* For swapping filedescriptors from the serve childs to the xfrd
* and/or the dnstap collector */
int *swap_fd_send;

/* ignore SIGCHLD from the previous server_main that used this pid */
memset(&ign_sigchld, 0, sizeof(ign_sigchld));
Expand Down Expand Up @@ -2491,7 +2552,6 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,
sigaction(SIGCHLD, &old_sigchld, NULL);
#ifdef USE_DNSTAP
if (nsd->dt_collector) {
int *swap_fd_send;
DEBUG(DEBUG_IPC,1, (LOG_INFO, "reload: swap dnstap collector pipes"));
/* Swap fd_send with fd_swap so old serve child and new serve
* childs will not write to the same pipe ends simultaneously */
Expand All @@ -2501,6 +2561,9 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,

}
#endif
swap_fd_send = nsd->serve2xfrd_fd_send;
nsd->serve2xfrd_fd_send = nsd->serve2xfrd_fd_swap;
nsd->serve2xfrd_fd_swap = swap_fd_send;
/* Start new child processes */
if (server_start_children(nsd, server_region, netio, &nsd->
xfrd_listener->fd) != 0) {
Expand Down
15 changes: 15 additions & 0 deletions xfrd.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ xfrd_init(int socket, struct nsd* nsd, int shortsoa, int reload_active,
pid_t nsd_pid)
{
region_type* region;
size_t i;

assert(xfrd == 0);
/* to setup signalhandling */
Expand Down Expand Up @@ -190,6 +191,20 @@ xfrd_init(int socket, struct nsd* nsd, int shortsoa, int reload_active,
xfrd->need_to_send_shutdown = 0;
xfrd->need_to_send_stats = 0;

xfrd->serve2xfrd = (struct event *) region_alloc_array_zero(
xfrd->region, nsd->child_count * 2, sizeof(struct event));
for(i = 0; i < 2 * nsd->child_count; i++) {
memset(&xfrd->serve2xfrd[i], 0, sizeof(struct event));
event_set(&xfrd->serve2xfrd[i], nsd->serve2xfrd_fd_recv[i],
EV_PERSIST|EV_READ, xfrd_handle_notify, xfrd);
if(event_base_set(xfrd->event_base, &xfrd->serve2xfrd[i]) != 0)
log_msg( LOG_ERR
, "xfrd serve2xfrd: event_base_set failed");
if(event_add(&xfrd->serve2xfrd[i], NULL) != 0)
log_msg(LOG_ERR, "xfrd serve2xfrd: event_add failed");
}
xfrd->notify_message = buffer_create(xfrd->region, QIOBUFSZ);

xfrd->write_zonefile_needed = 0;
if(nsd->options->zonefiles_write)
xfrd_write_timer_set();
Expand Down
6 changes: 6 additions & 0 deletions xfrd.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ struct xfrd_state {
int ipc_handler_flags;
struct xfrd_tcp *ipc_conn;
struct buffer* ipc_pass;

/* 2 * nsd->child_count communication channels with the serve childs */
struct event* serve2xfrd;
/* the message passed down from the serve process */
struct buffer* notify_message;

/* sending ipc to server_main */
uint8_t need_to_send_shutdown;
uint8_t need_to_send_reload;
Expand Down

0 comments on commit 7090635

Please sign in to comment.