Skip to content

Commit

Permalink
Pass notifies from serve processes to xfrd
Browse files Browse the repository at this point in the history
Directly without main or backup-main proxying for it.
  • Loading branch information
wtoorop committed Oct 18, 2024
1 parent 8a9660d commit aeac0b7
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 218 deletions.
209 changes: 33 additions & 176 deletions ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,6 @@ send_stat_to_child(struct main_ipc_handler_data* data, int fd)
data->child->need_to_send_STATS = 0;
}

#ifndef NDEBUG
int packet_read_query_section(buffer_type *packet, uint8_t* dest, uint16_t* qtype, uint16_t* qclass);
static void
debug_print_fwd_name(int ATTR_UNUSED(len), buffer_type* packet, int acl_num)
{
uint8_t qnamebuf[MAXDOMAINLEN];
uint16_t qtype, qclass;
const dname_type* dname;
region_type* tempregion = region_create(xalloc, free);

size_t bufpos = buffer_position(packet);
buffer_rewind(packet);
buffer_skip(packet, 12);
if(packet_read_query_section(packet, qnamebuf, &qtype, &qclass)) {
dname = dname_make(tempregion, qnamebuf, 1);
log_msg(LOG_INFO, "main: fwd packet for %s, acl %d",
dname_to_string(dname,0), acl_num);
} else {
log_msg(LOG_INFO, "main: fwd packet badqname, acl %d", acl_num);
}
buffer_set_position(packet, bufpos);
region_destroy(tempregion);
}
#endif

static void
send_quit_to_child(struct main_ipc_handler_data* data, int fd)
{
Expand Down Expand Up @@ -332,95 +307,6 @@ parent_handle_child_command(netio_type *ATTR_UNUSED(netio),
return;
}

if (data->forward_mode) {
int got_acl;
/* forward the data to xfrd */
DEBUG(DEBUG_IPC,2, (LOG_INFO,
"main passed packet readup %d", (int)data->got_bytes));
if(data->got_bytes < sizeof(data->total_bytes))
{
if ((len = read(handler->fd,
(char*)&data->total_bytes+data->got_bytes,
sizeof(data->total_bytes)-data->got_bytes)) == -1) {
log_msg(LOG_ERR, "handle_child_command: read: %s",
strerror(errno));
return;
}
if(len == 0) {
/* EOF */
data->forward_mode = 0;
return;
}
data->got_bytes += len;
if(data->got_bytes < sizeof(data->total_bytes))
return;
data->total_bytes = ntohs(data->total_bytes);
buffer_clear(data->packet);
if(data->total_bytes > buffer_capacity(data->packet)) {
log_msg(LOG_ERR, "internal error: ipc too large");
exit(1);
}
return;
}
/* read the packet */
if(data->got_bytes-sizeof(data->total_bytes) < data->total_bytes) {
if((len = read(handler->fd, buffer_current(data->packet),
data->total_bytes - (data->got_bytes-sizeof(data->total_bytes))
)) == -1 ) {
log_msg(LOG_ERR, "handle_child_command: read: %s",
strerror(errno));
return;
}
if(len == 0) {
/* EOF */
data->forward_mode = 0;
return;
}
data->got_bytes += len;
buffer_skip(data->packet, len);
/* read rest later */
return;
}
/* read the acl numbers */
got_acl = data->got_bytes - sizeof(data->total_bytes) - data->total_bytes;
if((len = read(handler->fd, (char*)&data->acl_num+got_acl,
sizeof(data->acl_num)+sizeof(data->acl_xfr)-got_acl)) == -1 ) {
log_msg(LOG_ERR, "handle_child_command: read: %s",
strerror(errno));
return;
}
if(len == 0) {
/* EOF */
data->forward_mode = 0;
return;
}
got_acl += len;
data->got_bytes += len;
if(got_acl >= (int)(sizeof(data->acl_num)+sizeof(data->acl_xfr))) {
uint16_t len = htons(data->total_bytes);
DEBUG(DEBUG_IPC,2, (LOG_INFO,
"main fwd passed packet write %d", (int)data->got_bytes));
#ifndef NDEBUG
if(nsd_debug_level >= 2)
debug_print_fwd_name(len, data->packet, data->acl_num);
#endif
data->forward_mode = 0;
mode = NSD_PASS_TO_XFRD;
if(!write_socket(*data->xfrd_sock, &mode, sizeof(mode)) ||
!write_socket(*data->xfrd_sock, &len, sizeof(len)) ||
!write_socket(*data->xfrd_sock, buffer_begin(data->packet),
data->total_bytes) ||
!write_socket(*data->xfrd_sock, &data->acl_num,
sizeof(data->acl_num)) ||
!write_socket(*data->xfrd_sock, &data->acl_xfr,
sizeof(data->acl_xfr))) {
log_msg(LOG_ERR, "error in ipc fwd main2xfrd: %s",
strerror(errno));
}
}
return;
}

/* read command from ipc */
if ((len = read(handler->fd, &mode, sizeof(mode))) == -1) {
log_msg(LOG_ERR, "handle_child_command: read: %s",
Expand All @@ -443,12 +329,6 @@ parent_handle_child_command(netio_type *ATTR_UNUSED(netio),
case NSD_REAP_CHILDREN:
data->nsd->signal_hint_child = 1;
break;
case NSD_PASS_TO_XFRD:
/* set mode for handle_child_command; echo to xfrd. */
data->forward_mode = 1;
data->got_bytes = 0;
data->total_bytes = 0;
break;
default:
log_msg(LOG_ERR, "handle_child_command: bad mode %d",
(int) mode);
Expand Down Expand Up @@ -651,52 +531,6 @@ xfrd_handle_ipc_read(struct event* handler, xfrd_state_type* xfrd)
sig_atomic_t cmd;
int len;

if(xfrd->ipc_conn->is_reading==2) {
buffer_type* tmp = xfrd->ipc_pass;
uint32_t acl_num;
int32_t acl_xfr;
/* read acl_num */
int ret = conn_read(xfrd->ipc_conn);
if(ret == -1) {
log_msg(LOG_ERR, "xfrd: error in read ipc: %s", strerror(errno));
xfrd->ipc_conn->is_reading = 0;
return;
}
if(ret == 0)
return;
buffer_flip(xfrd->ipc_conn->packet);
xfrd->ipc_pass = xfrd->ipc_conn->packet;
xfrd->ipc_conn->packet = tmp;
xfrd->ipc_conn->is_reading = 0;
acl_num = buffer_read_u32(xfrd->ipc_pass);
acl_xfr = (int32_t)buffer_read_u32(xfrd->ipc_pass);
xfrd_handle_passed_packet(xfrd->ipc_conn->packet, acl_num, acl_xfr);
return;
}
if(xfrd->ipc_conn->is_reading) {
/* reading an IPC message */
buffer_type* tmp;
int ret = conn_read(xfrd->ipc_conn);
if(ret == -1) {
log_msg(LOG_ERR, "xfrd: error in read ipc: %s", strerror(errno));
xfrd->ipc_conn->is_reading = 0;
return;
}
if(ret == 0)
return;
buffer_flip(xfrd->ipc_conn->packet);
/* use ipc_conn to read remaining data as well */
tmp = xfrd->ipc_pass;
xfrd->ipc_conn->is_reading=2;
xfrd->ipc_pass = xfrd->ipc_conn->packet;
xfrd->ipc_conn->packet = tmp;
xfrd->ipc_conn->total_bytes = sizeof(xfrd->ipc_conn->msglen);
xfrd->ipc_conn->msglen = 2*sizeof(uint32_t);
buffer_clear(xfrd->ipc_conn->packet);
buffer_set_limit(xfrd->ipc_conn->packet, xfrd->ipc_conn->msglen);
return;
}

if((len = read(handler->ev_fd, &cmd, sizeof(cmd))) == -1) {
if(errno != EINTR && errno != EAGAIN)
log_msg(LOG_ERR, "xfrd_handle_ipc: read: %s",
Expand Down Expand Up @@ -747,10 +581,6 @@ xfrd_handle_ipc_read(struct event* handler, xfrd_state_type* xfrd)
xfrd_prepare_zones_for_reload();
xfrd->reload_failed = 0;
break;
case NSD_PASS_TO_XFRD:
DEBUG(DEBUG_IPC,1, (LOG_INFO, "xfrd: ipc recv PASS_TO_XFRD"));
xfrd->ipc_conn->is_reading = 1;
break;
case NSD_RELOAD_REQ:
DEBUG(DEBUG_IPC,1, (LOG_INFO, "xfrd: ipc recv RELOAD_REQ"));
/* make reload happen, right away, and schedule file check */
Expand All @@ -769,11 +599,38 @@ xfrd_handle_ipc_read(struct event* handler, xfrd_state_type* xfrd)
(int)ntohl(cmd));
break;
}
}

if(xfrd->ipc_conn->is_reading) {
/* setup read of info */
xfrd->ipc_conn->total_bytes = 0;
xfrd->ipc_conn->msglen = 0;
buffer_clear(xfrd->ipc_conn->packet);
}
void
xfrd_handle_notify(int ATTR_UNUSED(fd), short event, void* arg)
{
struct xfrd_tcp* notify_pipe = (struct xfrd_tcp*)arg;
uint32_t acl_num;
int32_t acl_xfr;

if(!(event & EV_READ))
return;

switch(conn_read(notify_pipe)){
case -1: /* TODO: What to do here? */
return;
case 0: return; /* call back later */
default: break;
}
if(buffer_limit(notify_pipe->packet) < sizeof(acl_xfr)+sizeof(acl_num))
log_msg(LOG_ERR, "xfrd_handle_notify invalid message size");
else {
size_t eop = buffer_position(notify_pipe->packet)
- sizeof(acl_xfr) - sizeof(acl_num);

buffer_set_position(notify_pipe->packet, eop);
acl_num = buffer_read_u32(notify_pipe->packet);
acl_xfr = (int32_t)buffer_read_u32(notify_pipe->packet);
buffer_set_position(notify_pipe->packet, eop);
buffer_flip(notify_pipe->packet);
xfrd_handle_passed_packet(notify_pipe->packet,acl_num,acl_xfr);
}
notify_pipe->total_bytes = 0;
notify_pipe->msglen = 0;
buffer_clear(notify_pipe->packet);
}
13 changes: 3 additions & 10 deletions ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ struct main_ipc_handler_data
{
struct nsd *nsd;
struct nsd_child *child;
int child_num;

/* pointer to the socket, as it may change if it is restarted */
int *xfrd_sock;
struct buffer *packet;
int forward_mode;
size_t got_bytes;
uint16_t total_bytes;
uint32_t acl_num;
int32_t acl_xfr;
};

/*
Expand Down Expand Up @@ -84,6 +74,9 @@ void child_handle_parent_command(int fd, short event, void* arg);
*/
void xfrd_handle_ipc(int fd, short event, void* arg);

/* receive incoming notifies received by and from the serve processes */
void xfrd_handle_notify(int fd, short event, void* arg);

/* check if all children have exited in an orderly fashion and set mode */
void parent_check_all_children_exited(struct nsd* nsd);

Expand Down
16 changes: 9 additions & 7 deletions nsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ struct dt_collector;
#define NSD_STATS 3
#define NSD_REAP_CHILDREN 4
#define NSD_QUIT 5
/*
* PASS_TO_XFRD is followed by the u16(len in network order) and
* then network packet contents. packet is a notify(acl checked), or
* xfr reply from a master(acl checked).
* followed by u32(acl number that matched from notify/xfr acl).
*/
#define NSD_PASS_TO_XFRD 6
/*
* RELOAD_REQ is sent when parent receives a SIGHUP and tells
* xfrd that it wants to initiate a reload (and thus task swap).
Expand Down Expand Up @@ -354,6 +347,15 @@ struct nsd
* simultaneous with new serve childs. */
int *dt_collector_fd_swap;
#endif /* USE_DNSTAP */
/* the pipes from the serve processes to xfrd, for passing through
* NOTIFY messages, arrays of size child_count * 2.
* Kept open for (re-)forks. */
int *serve2xfrd_fd_send, *serve2xfrd_fd_recv;
/* the pipes from the serve processes to the xfrd. Initially
* these point halfway into serve2xfrd_fd_send, but during reload
* the pointer is swapped with serve2xfrd_fd_send so that only one
* serve child will write to the same fd simultaneously. */
int *serve2xfrd_fd_swap;
/* ratelimit for errors, time value */
time_t err_limit_time;
/* ratelimit for errors, packet count */
Expand Down
9 changes: 4 additions & 5 deletions query.c
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,7 @@ answer_notify(struct nsd* nsd, struct query *query)
if((acl_num = acl_check_incoming(zone_opt->pattern->allow_notify, query,
&why)) != -1)
{
sig_atomic_t mode = NSD_PASS_TO_XFRD;
int s = nsd->this_child->parent_fd;
int s = nsd->serve2xfrd_fd_send[nsd->this_child->child_num];
uint16_t sz;
uint32_t acl_send = htonl(acl_num);
uint32_t acl_xfr;
Expand All @@ -485,14 +484,14 @@ answer_notify(struct nsd* nsd, struct query *query)
why->ip_address_spec,
why->nokey?"NOKEY":
(why->blocked?"BLOCKED":why->key_name)));
sz = buffer_limit(query->packet);
if(buffer_limit(query->packet) > MAX_PACKET_SIZE)
return query_error(query, NSD_RC_SERVFAIL);
/* forward to xfrd for processing
Note. Blocking IPC I/O, but acl is OK. */
sz = buffer_limit(query->packet)
+ sizeof(acl_send) + sizeof(acl_xfr);
sz = htons(sz);
if(!write_socket(s, &mode, sizeof(mode)) ||
!write_socket(s, &sz, sizeof(sz)) ||
if(!write_socket(s, &sz, sizeof(sz)) ||
!write_socket(s, buffer_begin(query->packet),
buffer_limit(query->packet)) ||
!write_socket(s, &acl_send, sizeof(acl_send)) ||
Expand Down
Loading

0 comments on commit aeac0b7

Please sign in to comment.