summaryrefslogtreecommitdiff
path: root/src/knot/server/xfr-handler.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/knot/server/xfr-handler.c')
-rw-r--r--src/knot/server/xfr-handler.c417
1 files changed, 246 insertions, 171 deletions
diff --git a/src/knot/server/xfr-handler.c b/src/knot/server/xfr-handler.c
index c7c1abf..90b0a42 100644
--- a/src/knot/server/xfr-handler.c
+++ b/src/knot/server/xfr-handler.c
@@ -44,9 +44,9 @@
#include "libknot/rrset.h"
/* Constants */
-#define XFR_CHUNKLEN 3 /*! Number of requests assigned in a single pass. */
+#define XFR_MAX_TASKS 1024 /*! Maximum pending tasks. */
+#define XFR_CHUNKLEN 16 /*! Number of requests assigned in a single pass. */
#define XFR_SWEEP_INTERVAL 2 /*! [seconds] between sweeps. */
-#define XFR_BUFFER_SIZE 65535 /*! Do not change this - maximum value for UDP packet length. */
#define XFR_MSG_DLTTR 9 /*! Index of letter differentiating IXFR/AXFR in log msg. */
/* Messages */
@@ -62,14 +62,37 @@ static knot_lookup_table_t xfr_type_table[] = {
{ XFR_TYPE_AIN, NULL }
};
static knot_lookup_table_t xfr_result_table[] = {
- { XFR_TYPE_AIN, "Started" },
- { XFR_TYPE_IIN, "Started" },
- { XFR_TYPE_SOA, "Query issued" },
- { XFR_TYPE_NOTIFY, "Query issued" },
- { XFR_TYPE_FORWARD, "Forwarded query" },
+ { XFR_TYPE_AIN, "Started." },
+ { XFR_TYPE_IIN, "Started." },
+ { XFR_TYPE_SOA, "Query issued." },
+ { XFR_TYPE_NOTIFY, "Query issued." },
+ { XFR_TYPE_FORWARD, "Forwarded query." },
{ XFR_TYPE_AIN, NULL }
};
+/* Limits. */
+static bool xfr_pending_incr(xfrhandler_t *xfr)
+{
+ bool ret = false;
+ pthread_mutex_lock(&xfr->pending_mx);
+ rcu_read_lock();
+ if (xfr->pending < conf()->xfers) {
+ ++xfr->pending;
+ ret = true;
+ }
+ rcu_read_unlock();
+ pthread_mutex_unlock(&xfr->pending_mx);
+
+ return ret;
+}
+
+static void xfr_pending_decr(xfrhandler_t *xfr)
+{
+ pthread_mutex_lock(&xfr->pending_mx);
+ --xfr->pending;
+ pthread_mutex_unlock(&xfr->pending_mx);
+}
+
/* I/O wrappers */
static int xfr_send_tcp(int fd, sockaddr_t *addr, uint8_t *msg, size_t msglen)
@@ -84,28 +107,6 @@ static int xfr_recv_tcp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen)
static int xfr_recv_udp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen)
{ return recvfrom(fd, buf, buflen, 0, (struct sockaddr *)addr, &addr->len); }
-/* Context fetching. */
-
-static knot_ns_xfr_t* xfr_task_get(xfrworker_t *w, int fd)
-{
- value_t *val = ahtable_tryget(w->pool.t, (const char*)&fd, sizeof(int));
- if (!val) return NULL;
- return *val;
-}
-
-static void xfr_task_set(xfrworker_t *w, int fd, knot_ns_xfr_t *rq)
-{
- fdset_add(w->pool.fds, fd, OS_EV_READ);
- value_t *val = ahtable_get(w->pool.t, (const char*)&fd, sizeof(int));
- *val = rq;
-}
-
-static void xfr_task_clear(xfrworker_t *w, int fd)
-{
- ahtable_del(w->pool.t, (const char*)&fd, sizeof(int));
- fdset_remove(w->pool.fds, fd);
-}
-
/*! \brief Wrapper function for answering AXFR/OUT. */
static int xfr_answer_axfr(knot_nameserver_t *ns, knot_ns_xfr_t *xfr)
{
@@ -241,10 +242,8 @@ static int xfr_task_setsig(knot_ns_xfr_t *rq, knot_tsig_key_t *key)
static int xfr_task_connect(knot_ns_xfr_t *rq)
{
/* Create socket by type. */
- int stype = SOCK_DGRAM;
- if (rq->flags & XFR_FLAG_TCP) {
- stype = SOCK_STREAM;
- }
+ int ret = 0;
+ int stype = (rq->flags & XFR_FLAG_TCP) ? SOCK_STREAM : SOCK_DGRAM;
int fd = socket_create(sockaddr_family(&rq->addr), stype, 0);
if (fd < 0) {
return KNOT_ERROR;
@@ -257,14 +256,22 @@ static int xfr_task_connect(knot_ns_xfr_t *rq)
return KNOT_ERROR;
}
}
+
/* Connect if TCP. */
if (rq->flags & XFR_FLAG_TCP) {
- if (connect(fd, (struct sockaddr *)&rq->addr, rq->addr.len) < 0) {
+ if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
+ ; /* Go silently with blocking if it fails. */
+
+ ret = connect(fd, (struct sockaddr *)&rq->addr, rq->addr.len);
+ if (ret != 0 && errno != EINPROGRESS) {
socket_close(fd);
return KNOT_ECONNREFUSED;
}
}
+ /* Set up for UDP as well to trigger 'send query' event. */
+ rq->flags |= XFR_FLAG_CONNECTING;
+
/* Store new socket descriptor. */
rq->session = fd;
return KNOT_EOK;
@@ -299,15 +306,12 @@ static void xfr_task_cleanup(knot_ns_xfr_t *rq)
hattrie_clear(rq->lookup_tree);
}
-/*! \brief Close and free task. */
-static int xfr_task_close(xfrworker_t *w, int fd)
+/*! \brief End task properly and free it. */
+static int xfr_task_close(knot_ns_xfr_t *rq)
{
- knot_ns_xfr_t *rq = xfr_task_get(w, fd);
- if (!rq) return KNOT_ENOENT;
-
/* Update xfer state. */
+ zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone);
if (rq->type == XFR_TYPE_AIN || rq->type == XFR_TYPE_IIN) {
- zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone);
pthread_mutex_lock(&zd->lock);
if (zd->xfr_in.state == XFR_PENDING) {
zd->xfr_in.state = XFR_IDLE;
@@ -315,15 +319,25 @@ static int xfr_task_close(xfrworker_t *w, int fd)
pthread_mutex_unlock(&zd->lock);
}
+ /* Reschedule failed bootstrap. */
+ if (rq->type == XFR_TYPE_AIN && !knot_zone_contents(rq->zone)) {
+ int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand();
+ event_t *ev = zd->xfr_in.timer;
+ if (ev) {
+ evsched_cancel(ev->parent, ev);
+ evsched_schedule(ev->parent, ev, tmr_s);
+ }
+ log_zone_notice("%s Bootstrap failed, next attempt in %d seconds.\n",
+ rq->msg, tmr_s / 1000);
+ }
+
/* Close socket and free task. */
- xfr_task_clear(w, fd);
xfr_task_free(rq);
- socket_close(fd);
return KNOT_EOK;
}
/*! \brief Timeout handler. */
-static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq)
+static int xfr_task_expire(fdset_t *set, int i, knot_ns_xfr_t *rq)
{
/* Fetch related zone (refcounted, no RCU). */
knot_zone_t *zone = (knot_zone_t *)rq->zone;
@@ -335,7 +349,7 @@ static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq)
case XFR_TYPE_NOTIFY:
if ((long)--rq->data > 0) { /* Retries */
notify_create_request(contents, rq->wire, &rq->wire_size);
- fdset_set_watchdog(fds, rq->session, NOTIFY_TIMEOUT);
+ fdset_set_watchdog(set, i, NOTIFY_TIMEOUT);
rq->send(rq->session, &rq->addr, rq->wire, rq->wire_size);
log_zone_info("%s Query issued (serial %u).\n",
rq->msg, knot_zone_serial(contents));
@@ -370,14 +384,6 @@ static int xfr_task_start(knot_ns_xfr_t *rq)
return KNOT_ECONNREFUSED;
}
- /* Connect to remote. */
- if (rq->session <= 0) {
- ret = xfr_task_connect(rq);
- if (ret != KNOT_EOK) {
- return ret;
- }
- }
-
/* Prepare TSIG key if set. */
int add_tsig = 0;
if (rq->tsig_key) {
@@ -437,20 +443,13 @@ static int xfr_task_start(knot_ns_xfr_t *rq)
return KNOT_EOK;
}
-static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, size_t buflen)
+static int xfr_task_is_transfer(knot_ns_xfr_t *rq)
{
- /* Check if not already processing, zone is refcounted. */
- zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone);
-
- /* Check if the zone is not discarded. */
- if (!zd || knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) {
- dbg_xfr_verb("xfr: request on a discarded zone, ignoring\n");
- return KNOT_EINVAL;
- }
-
- /* Update XFR message prefix. */
- xfr_task_setmsg(rq, NULL);
+ return rq->type == XFR_TYPE_AIN || rq->type == XFR_TYPE_IIN;
+}
+static void xfr_async_setbuf(knot_ns_xfr_t *rq, uint8_t *buf, size_t buflen)
+{
/* Update request. */
rq->wire = buf;
rq->wire_size = buflen;
@@ -461,17 +460,81 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz
rq->send = &xfr_send_tcp;
rq->recv = &xfr_recv_tcp;
}
+}
+
+static int xfr_async_start(fdset_t *set, knot_ns_xfr_t *rq)
+{
+ /* Update XFR message prefix. */
+ int ret = KNOT_EOK;
+ xfr_task_setmsg(rq, NULL);
+
+ /* Connect to remote. */
+ if (rq->session <= 0)
+ ret = xfr_task_connect(rq);
+
+ /* Add to set. */
+ if (ret == KNOT_EOK) {
+ unsigned flags = POLLIN;
+ if (rq->flags & XFR_FLAG_CONNECTING)
+ flags = POLLOUT;
+ int next_id = fdset_add(set, rq->session, flags, rq);
+ if (next_id >= 0) {
+ /* Set default connection timeout. */
+ rcu_read_lock();
+ fdset_set_watchdog(set, next_id, conf()->max_conn_reply);
+ rcu_read_unlock();
+ } else {
+ /* Or refuse if failed. */
+ ret = KNOT_ECONNREFUSED;
+ }
+ }
+
+ return ret;
+}
+
+static int xfr_async_state(knot_ns_xfr_t *rq)
+{
+ /* Check socket status. */
+ int err = EINVAL;
+ socklen_t len = sizeof(int);
+ if (getsockopt(rq->session, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
+ return KNOT_ERROR;
+ if (err != 0)
+ return knot_map_errno(err);
+ return KNOT_EOK;
+}
+
+static int xfr_async_finish(fdset_t *set, unsigned id)
+{
+ /* Drop back to synchronous mode. */
+ int ret = KNOT_EOK;
+ knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set->ctx[id];
+ if ((ret = xfr_async_state(rq)) == KNOT_EOK) {
+ rq->flags &= ~XFR_FLAG_CONNECTING;
+ set->pfd[id].events = POLLIN;
+ if (fcntl(set->pfd[id].fd, F_SETFL, 0) < 0)
+ ;
+ } else {
+ /* Do not attempt to start on broken connection. */
+ return KNOT_ECONNREFUSED;
+ }
+
+ /* Check if the zone is not discarded. */
+ zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone);
+ if (!zd || knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) {
+ dbg_xfr_verb("xfr: request on a discarded zone, ignoring\n");
+ return KNOT_EINVAL;
+ }
/* Handle request. */
dbg_xfr("%s processing request type '%d'\n", rq->msg, rq->type);
- int ret = xfr_task_start(rq);
+ ret = xfr_task_start(rq);
const char *msg = knot_strerror(ret);
knot_lookup_table_t *xd = knot_lookup_by_id(xfr_result_table, rq->type);
if (xd && ret == KNOT_EOK) {
msg = xd->name;
}
- int bootstrap_fail = 0;
switch(rq->type) {
case XFR_TYPE_AIN:
case XFR_TYPE_IIN:
@@ -480,36 +543,28 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz
pthread_mutex_lock(&zd->lock);
zd->xfr_in.state = XFR_IDLE;
pthread_mutex_unlock(&zd->lock);
- bootstrap_fail = !knot_zone_contents(rq->zone);
}
break;
- case XFR_TYPE_NOTIFY: /* Send on first timeout <0,5>s. */
- fdset_set_watchdog(w->pool.fds, rq->session, (int)(tls_rand() * 5));
- xfr_task_set(w, rq->session, rq);
+ case XFR_TYPE_NOTIFY:
+ /* This is a bit of a hack to adapt NOTIFY lifetime tracking.
+ * When NOTIFY event enters handler, it shouldn't be sent immediately.
+ * To accomodate for this, <0, 5>s random delay is set on
+ * event startup, so the first query fires when this timer
+ * expires. */
+ fdset_set_watchdog(set, id, (int)(tls_rand() * 5));
return KNOT_EOK;
case XFR_TYPE_SOA:
case XFR_TYPE_FORWARD:
- fdset_set_watchdog(w->pool.fds, rq->session, conf()->max_conn_reply);
+ fdset_set_watchdog(set, id, conf()->max_conn_reply);
break;
default:
break;
}
if (ret == KNOT_EOK) {
- xfr_task_set(w, rq->session, rq);
- log_server_info("%s %s.\n", rq->msg, msg);
- } else if (bootstrap_fail){
- int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand();
- event_t *ev = zd->xfr_in.timer;
- if (ev) {
- evsched_cancel(ev->parent, ev);
- evsched_schedule(ev->parent, ev, tmr_s);
- }
- log_zone_notice("%s Bootstrap failed, next "
- "attempt in %d seconds.\n",
- rq->msg, tmr_s / 1000);
+ log_server_info("%s %s\n", rq->msg, msg);
} else {
- log_server_error("%s %s.\n", rq->msg, msg);
+ log_server_error("%s %s\n", rq->msg, msg);
}
return ret;
@@ -745,16 +800,11 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
/* Only for successful xfers. */
if (ret > 0) {
ret = xfr_task_finalize(w, rq);
- if (ret != KNOT_EOK && !knot_zone_contents(rq->zone)) {
-
- /* AXFR bootstrap timeout. */
- zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone);
- int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand();
- zd->xfr_in.bootstrap_retry = tmr_s;
- log_zone_info("%s Next attempt to bootstrap "
- "in %d seconds.\n",
- rq->msg, tmr_s / 1000);
- } else if (ret == KNOT_EBUSY && rq->type == XFR_TYPE_IIN) {
+
+ /* EBUSY on incremental transfer has a special meaning and
+ * is caused by a journal not able to free up space for incoming
+ * transfer, thus forcing to start a new full zone transfer. */
+ if (ret == KNOT_EBUSY && rq->type == XFR_TYPE_IIN) {
return xfr_start_axfr(w, rq, diff_nospace_msg);
} else {
@@ -763,7 +813,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq)
}
/* Update REFRESH/RETRY */
- zones_schedule_refresh(rq->zone, 0);
+ zones_schedule_refresh(rq->zone, REFRESH_DEFAULT);
ret = KNOT_ECONNREFUSED; /* Disconnect */
}
@@ -801,37 +851,36 @@ static int xfr_process_event(xfrworker_t *w, knot_ns_xfr_t *rq)
}
}
-/*! \brief Sweep non-replied connection. */
-static void xfr_sweep(fdset_t *set, int fd, void *data)
+/*! \brief Sweep inactive connection. */
+static enum fdset_sweep_state xfr_sweep(fdset_t *set, int i, void *data)
{
- dbg_xfr("xfr: sweeping fd=%d\n", fd);
- if (!set || !data) {
- dbg_xfr("xfr: invalid sweep operation on NULL worker or set\n");
- return;
- }
- xfrworker_t *w = (xfrworker_t *)data;
- knot_ns_xfr_t *rq = xfr_task_get(w, fd);
- if (!rq) {
- dbg_xfr("xfr: NULL data to sweep\n");
- return;
- }
+ assert(set && i < set->n && i >= 0);
+
+ knot_ns_xfr_t *rq = set->ctx[i];
+ xfrhandler_t *xfr = (xfrhandler_t *)data;
- /* Skip non-sweepable types. */
+ /* Expire only UDP requests. */
int ret = KNOT_ECONNREFUSED;
switch(rq->type) {
case XFR_TYPE_SOA:
case XFR_TYPE_NOTIFY:
case XFR_TYPE_FORWARD:
- ret = xfr_task_expire(set, rq);
+ ret = xfr_task_expire(set, i, rq);
break;
default:
break;
}
+ /* Close if not valid anymore. */
if (ret != KNOT_EOK) {
- xfr_task_close(w, fd);
- --w->pending;
+ if (xfr_task_is_transfer(rq))
+ xfr_pending_decr(xfr);
+ xfr_task_close(rq);
+ socket_close(set->pfd[i].fd);
+ return FDSET_SWEEP;
}
+
+ return FDSET_KEEP;
}
/*! \brief Check TSIG if exists. */
@@ -969,7 +1018,7 @@ int xfr_worker(dthread_t *thread)
xfrhandler_t *xfr = w->master;
/* Buffer for answering. */
- size_t buflen = XFR_BUFFER_SIZE;
+ size_t buflen = SOCKET_MTU_SZ;
uint8_t* buf = malloc(buflen);
if (buf == NULL) {
dbg_xfr("xfr: failed to allocate buffer for XFR worker\n");
@@ -981,84 +1030,113 @@ int xfr_worker(dthread_t *thread)
time_now(&next_sweep);
next_sweep.tv_sec += XFR_SWEEP_INTERVAL;
- int limit = XFR_CHUNKLEN * 3;
- if (conf() && conf()->xfers > 0) {
- limit = conf()->xfers;
+ /* Approximate thread capacity limits. */
+ unsigned threads = w->master->unit->size;
+ unsigned thread_capacity = XFR_MAX_TASKS / threads;
+
+ /* Set of connections. */
+ fdset_t set;
+ int ret = fdset_init(&set, thread_capacity);
+ if (ret != KNOT_EOK) {
+ free(buf);
+ return ret;
}
- unsigned thread_capacity = limit / w->master->unit->size;
- if (thread_capacity < 1) thread_capacity = 1;
- w->pool.fds = fdset_new();
- w->pool.t = ahtable_create();
- w->pending = 0;
/* Accept requests. */
- int ret = 0;
dbg_xfr_verb("xfr: worker=%p starting\n", w);
for (;;) {
/* Populate pool with new requests. */
- if (w->pending <= thread_capacity) {
- pthread_mutex_lock(&xfr->mx);
- unsigned was_pending = w->pending;
- while (!EMPTY_LIST(xfr->queue)) {
- knot_ns_xfr_t *rq = HEAD(xfr->queue);
- rem_node(&rq->n);
+ unsigned newconns = 0;
+ for (;;) {
+ /* Do not exceed thread capacity. */
+ if (set.n >= thread_capacity || newconns > XFR_CHUNKLEN)
+ break;
- /* Unlock queue and process request. */
+ /* Tak first request. */
+ pthread_mutex_lock(&xfr->mx);
+ if (EMPTY_LIST(xfr->queue)) {
pthread_mutex_unlock(&xfr->mx);
- ret = xfr_task_process(w, rq, buf, buflen);
- if (ret == KNOT_EOK) ++w->pending;
- else xfr_task_free(rq);
- pthread_mutex_lock(&xfr->mx);
+ break;
+ }
- if (w->pending - was_pending > XFR_CHUNKLEN)
- break;
+
+ /* Limit number of transfers. */
+ knot_ns_xfr_t *rq = HEAD(xfr->queue);
+ unsigned is_transfer = xfr_task_is_transfer(rq);
+ if (is_transfer && !xfr_pending_incr(xfr)) {
+ pthread_mutex_unlock(&xfr->mx);
+ break;
}
+
+ rem_node(&rq->n);
pthread_mutex_unlock(&xfr->mx);
+
+ /* Start asynchronous connect. */
+ xfr_async_setbuf(rq, buf, buflen);
+ if (xfr_async_start(&set, rq) != KNOT_EOK) {
+ if (is_transfer)
+ xfr_pending_decr(xfr);
+ xfr_task_close(rq);
+ break;
+ }
+
+ ++newconns;
}
/* Check pending threads. */
- if (dt_is_cancelled(thread) || w->pending == 0) {
+ if (dt_is_cancelled(thread) || set.n == 0) {
break;
}
/* Poll fdset. */
- int nfds = fdset_wait(w->pool.fds, (XFR_SWEEP_INTERVAL/2) * 1000);
+ int nfds = poll(set.pfd, set.n, XFR_SWEEP_INTERVAL * 1000);
if (nfds < 0) {
- if (errno == EINTR) continue;
+ if (errno == EINTR)
+ continue;
break;
}
/* Iterate fdset. */
- fdset_it_t it;
- fdset_begin(w->pool.fds, &it);
- while(nfds > 0) {
-
- /* Find data. */
- knot_ns_xfr_t *rq = xfr_task_get(w, it.fd);
- dbg_xfr_verb("xfr: worker=%p processing event on "
- "fd=%d data=%p.\n",
- w, it.fd, rq);
- if (rq) {
+ unsigned i = 0;
+ while (nfds > 0 && i < set.n && !dt_is_cancelled(thread)) {
+
+ if (!(set.pfd[i].revents & set.pfd[i].events)) {
+ /* Skip inactive. */
+ ++i;
+ continue;
+ } else {
+ /* One less active event. */
+ --nfds;
+ }
+
+ /* Process pending tasks. */
+ knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i];
+ if (rq->flags & XFR_FLAG_CONNECTING) {
+ ret = xfr_async_finish(&set, i);
+ } else {
ret = xfr_process_event(w, rq);
- if (ret != KNOT_EOK) {
- xfr_task_close(w, it.fd);
- --w->pending;
- --it.pos; /* Reset iterator */
- }
}
- /* Next fd. */
- if (fdset_next(w->pool.fds, &it) < 0) {
- break;
+ /* Check task state. */
+ if (ret != KNOT_EOK) {
+ if (xfr_task_is_transfer(rq))
+ xfr_pending_decr(xfr);
+ xfr_task_close(rq);
+ socket_close(set.pfd[i].fd);
+ fdset_remove(&set, i);
+ continue; /* Stay on the same index. */
}
+
+ /* Next active. */
+ ++i;
}
/* Sweep inactive. */
timev_t now;
if (time_now(&now) == 0) {
if (now.tv_sec >= next_sweep.tv_sec) {
- fdset_sweep(w->pool.fds, &xfr_sweep, w);
+ fdset_sweep(&set, &xfr_sweep, xfr);
memcpy(&next_sweep, &now, sizeof(next_sweep));
next_sweep.tv_sec += XFR_SWEEP_INTERVAL;
}
@@ -1066,22 +1144,17 @@ int xfr_worker(dthread_t *thread)
}
/* Cancel existing connections. */
- size_t keylen = 0;
- ahtable_iter_t i;
- ahtable_iter_begin(w->pool.t, &i, false);
- while (!ahtable_iter_finished(&i)) {
- int *key = (int *)ahtable_iter_key(&i, &keylen);
- xfr_task_close(w, *key);
- ahtable_iter_next(&i);
- }
- ahtable_iter_free(&i);
-
- /* Destroy data structures. */
- fdset_destroy(w->pool.fds);
- ahtable_free(w->pool.t);
- free(buf);
+ for (unsigned i = 0; i < set.n; ++i) {
+ knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i];
+ socket_close(set.pfd[i].fd);
+ if (xfr_task_is_transfer(rq))
+ xfr_pending_decr(xfr);
+ xfr_task_free(rq);
+ }
dbg_xfr_verb("xfr: worker=%p finished.\n", w);
+ fdset_clear(&set);
+ free(buf);
return KNOT_EOK;
}
@@ -1115,6 +1188,7 @@ xfrhandler_t *xfr_create(size_t thrcount, knot_nameserver_t *ns)
/* Create tasks structure and mutex. */
pthread_mutex_init(&xfr->mx, 0);
+ pthread_mutex_init(&xfr->pending_mx, 0);
init_list(&xfr->queue);
/* Assign worker threads. */
@@ -1133,6 +1207,7 @@ int xfr_free(xfrhandler_t *xfr)
}
/* Free RR mutex. */
+ pthread_mutex_destroy(&xfr->pending_mx);
pthread_mutex_destroy(&xfr->mx);
/* Free pending queue. */