diff options
Diffstat (limited to 'src/knot/server/xfr-handler.c')
-rw-r--r-- | src/knot/server/xfr-handler.c | 417 |
1 files changed, 246 insertions, 171 deletions
diff --git a/src/knot/server/xfr-handler.c b/src/knot/server/xfr-handler.c index c7c1abf..90b0a42 100644 --- a/src/knot/server/xfr-handler.c +++ b/src/knot/server/xfr-handler.c @@ -44,9 +44,9 @@ #include "libknot/rrset.h" /* Constants */ -#define XFR_CHUNKLEN 3 /*! Number of requests assigned in a single pass. */ +#define XFR_MAX_TASKS 1024 /*! Maximum pending tasks. */ +#define XFR_CHUNKLEN 16 /*! Number of requests assigned in a single pass. */ #define XFR_SWEEP_INTERVAL 2 /*! [seconds] between sweeps. */ -#define XFR_BUFFER_SIZE 65535 /*! Do not change this - maximum value for UDP packet length. */ #define XFR_MSG_DLTTR 9 /*! Index of letter differentiating IXFR/AXFR in log msg. */ /* Messages */ @@ -62,14 +62,37 @@ static knot_lookup_table_t xfr_type_table[] = { { XFR_TYPE_AIN, NULL } }; static knot_lookup_table_t xfr_result_table[] = { - { XFR_TYPE_AIN, "Started" }, - { XFR_TYPE_IIN, "Started" }, - { XFR_TYPE_SOA, "Query issued" }, - { XFR_TYPE_NOTIFY, "Query issued" }, - { XFR_TYPE_FORWARD, "Forwarded query" }, + { XFR_TYPE_AIN, "Started." }, + { XFR_TYPE_IIN, "Started." }, + { XFR_TYPE_SOA, "Query issued." }, + { XFR_TYPE_NOTIFY, "Query issued." }, + { XFR_TYPE_FORWARD, "Forwarded query." }, { XFR_TYPE_AIN, NULL } }; +/* Limits. */ +static bool xfr_pending_incr(xfrhandler_t *xfr) +{ + bool ret = false; + pthread_mutex_lock(&xfr->pending_mx); + rcu_read_lock(); + if (xfr->pending < conf()->xfers) { + ++xfr->pending; + ret = true; + } + rcu_read_unlock(); + pthread_mutex_unlock(&xfr->pending_mx); + + return ret; +} + +static void xfr_pending_decr(xfrhandler_t *xfr) +{ + pthread_mutex_lock(&xfr->pending_mx); + --xfr->pending; + pthread_mutex_unlock(&xfr->pending_mx); +} + /* I/O wrappers */ static int xfr_send_tcp(int fd, sockaddr_t *addr, uint8_t *msg, size_t msglen) @@ -84,28 +107,6 @@ static int xfr_recv_tcp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen) static int xfr_recv_udp(int fd, sockaddr_t *addr, uint8_t *buf, size_t buflen) { return recvfrom(fd, buf, buflen, 0, (struct sockaddr *)addr, &addr->len); } -/* Context fetching. */ - -static knot_ns_xfr_t* xfr_task_get(xfrworker_t *w, int fd) -{ - value_t *val = ahtable_tryget(w->pool.t, (const char*)&fd, sizeof(int)); - if (!val) return NULL; - return *val; -} - -static void xfr_task_set(xfrworker_t *w, int fd, knot_ns_xfr_t *rq) -{ - fdset_add(w->pool.fds, fd, OS_EV_READ); - value_t *val = ahtable_get(w->pool.t, (const char*)&fd, sizeof(int)); - *val = rq; -} - -static void xfr_task_clear(xfrworker_t *w, int fd) -{ - ahtable_del(w->pool.t, (const char*)&fd, sizeof(int)); - fdset_remove(w->pool.fds, fd); -} - /*! \brief Wrapper function for answering AXFR/OUT. */ static int xfr_answer_axfr(knot_nameserver_t *ns, knot_ns_xfr_t *xfr) { @@ -241,10 +242,8 @@ static int xfr_task_setsig(knot_ns_xfr_t *rq, knot_tsig_key_t *key) static int xfr_task_connect(knot_ns_xfr_t *rq) { /* Create socket by type. */ - int stype = SOCK_DGRAM; - if (rq->flags & XFR_FLAG_TCP) { - stype = SOCK_STREAM; - } + int ret = 0; + int stype = (rq->flags & XFR_FLAG_TCP) ? SOCK_STREAM : SOCK_DGRAM; int fd = socket_create(sockaddr_family(&rq->addr), stype, 0); if (fd < 0) { return KNOT_ERROR; @@ -257,14 +256,22 @@ static int xfr_task_connect(knot_ns_xfr_t *rq) return KNOT_ERROR; } } + /* Connect if TCP. */ if (rq->flags & XFR_FLAG_TCP) { - if (connect(fd, (struct sockaddr *)&rq->addr, rq->addr.len) < 0) { + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) + ; /* Go silently with blocking if it fails. */ + + ret = connect(fd, (struct sockaddr *)&rq->addr, rq->addr.len); + if (ret != 0 && errno != EINPROGRESS) { socket_close(fd); return KNOT_ECONNREFUSED; } } + /* Set up for UDP as well to trigger 'send query' event. */ + rq->flags |= XFR_FLAG_CONNECTING; + /* Store new socket descriptor. */ rq->session = fd; return KNOT_EOK; @@ -299,15 +306,12 @@ static void xfr_task_cleanup(knot_ns_xfr_t *rq) hattrie_clear(rq->lookup_tree); } -/*! \brief Close and free task. */ -static int xfr_task_close(xfrworker_t *w, int fd) +/*! \brief End task properly and free it. */ +static int xfr_task_close(knot_ns_xfr_t *rq) { - knot_ns_xfr_t *rq = xfr_task_get(w, fd); - if (!rq) return KNOT_ENOENT; - /* Update xfer state. */ + zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); if (rq->type == XFR_TYPE_AIN || rq->type == XFR_TYPE_IIN) { - zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); pthread_mutex_lock(&zd->lock); if (zd->xfr_in.state == XFR_PENDING) { zd->xfr_in.state = XFR_IDLE; @@ -315,15 +319,25 @@ static int xfr_task_close(xfrworker_t *w, int fd) pthread_mutex_unlock(&zd->lock); } + /* Reschedule failed bootstrap. */ + if (rq->type == XFR_TYPE_AIN && !knot_zone_contents(rq->zone)) { + int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand(); + event_t *ev = zd->xfr_in.timer; + if (ev) { + evsched_cancel(ev->parent, ev); + evsched_schedule(ev->parent, ev, tmr_s); + } + log_zone_notice("%s Bootstrap failed, next attempt in %d seconds.\n", + rq->msg, tmr_s / 1000); + } + /* Close socket and free task. */ - xfr_task_clear(w, fd); xfr_task_free(rq); - socket_close(fd); return KNOT_EOK; } /*! \brief Timeout handler. */ -static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq) +static int xfr_task_expire(fdset_t *set, int i, knot_ns_xfr_t *rq) { /* Fetch related zone (refcounted, no RCU). */ knot_zone_t *zone = (knot_zone_t *)rq->zone; @@ -335,7 +349,7 @@ static int xfr_task_expire(fdset_t *fds, knot_ns_xfr_t *rq) case XFR_TYPE_NOTIFY: if ((long)--rq->data > 0) { /* Retries */ notify_create_request(contents, rq->wire, &rq->wire_size); - fdset_set_watchdog(fds, rq->session, NOTIFY_TIMEOUT); + fdset_set_watchdog(set, i, NOTIFY_TIMEOUT); rq->send(rq->session, &rq->addr, rq->wire, rq->wire_size); log_zone_info("%s Query issued (serial %u).\n", rq->msg, knot_zone_serial(contents)); @@ -370,14 +384,6 @@ static int xfr_task_start(knot_ns_xfr_t *rq) return KNOT_ECONNREFUSED; } - /* Connect to remote. */ - if (rq->session <= 0) { - ret = xfr_task_connect(rq); - if (ret != KNOT_EOK) { - return ret; - } - } - /* Prepare TSIG key if set. */ int add_tsig = 0; if (rq->tsig_key) { @@ -437,20 +443,13 @@ static int xfr_task_start(knot_ns_xfr_t *rq) return KNOT_EOK; } -static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, size_t buflen) +static int xfr_task_is_transfer(knot_ns_xfr_t *rq) { - /* Check if not already processing, zone is refcounted. */ - zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); - - /* Check if the zone is not discarded. */ - if (!zd || knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) { - dbg_xfr_verb("xfr: request on a discarded zone, ignoring\n"); - return KNOT_EINVAL; - } - - /* Update XFR message prefix. */ - xfr_task_setmsg(rq, NULL); + return rq->type == XFR_TYPE_AIN || rq->type == XFR_TYPE_IIN; +} +static void xfr_async_setbuf(knot_ns_xfr_t *rq, uint8_t *buf, size_t buflen) +{ /* Update request. */ rq->wire = buf; rq->wire_size = buflen; @@ -461,17 +460,81 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz rq->send = &xfr_send_tcp; rq->recv = &xfr_recv_tcp; } +} + +static int xfr_async_start(fdset_t *set, knot_ns_xfr_t *rq) +{ + /* Update XFR message prefix. */ + int ret = KNOT_EOK; + xfr_task_setmsg(rq, NULL); + + /* Connect to remote. */ + if (rq->session <= 0) + ret = xfr_task_connect(rq); + + /* Add to set. */ + if (ret == KNOT_EOK) { + unsigned flags = POLLIN; + if (rq->flags & XFR_FLAG_CONNECTING) + flags = POLLOUT; + int next_id = fdset_add(set, rq->session, flags, rq); + if (next_id >= 0) { + /* Set default connection timeout. */ + rcu_read_lock(); + fdset_set_watchdog(set, next_id, conf()->max_conn_reply); + rcu_read_unlock(); + } else { + /* Or refuse if failed. */ + ret = KNOT_ECONNREFUSED; + } + } + + return ret; +} + +static int xfr_async_state(knot_ns_xfr_t *rq) +{ + /* Check socket status. */ + int err = EINVAL; + socklen_t len = sizeof(int); + if (getsockopt(rq->session, SOL_SOCKET, SO_ERROR, &err, &len) < 0) + return KNOT_ERROR; + if (err != 0) + return knot_map_errno(err); + return KNOT_EOK; +} + +static int xfr_async_finish(fdset_t *set, unsigned id) +{ + /* Drop back to synchronous mode. */ + int ret = KNOT_EOK; + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set->ctx[id]; + if ((ret = xfr_async_state(rq)) == KNOT_EOK) { + rq->flags &= ~XFR_FLAG_CONNECTING; + set->pfd[id].events = POLLIN; + if (fcntl(set->pfd[id].fd, F_SETFL, 0) < 0) + ; + } else { + /* Do not attempt to start on broken connection. */ + return KNOT_ECONNREFUSED; + } + + /* Check if the zone is not discarded. */ + zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); + if (!zd || knot_zone_flags(rq->zone) & KNOT_ZONE_DISCARDED) { + dbg_xfr_verb("xfr: request on a discarded zone, ignoring\n"); + return KNOT_EINVAL; + } /* Handle request. */ dbg_xfr("%s processing request type '%d'\n", rq->msg, rq->type); - int ret = xfr_task_start(rq); + ret = xfr_task_start(rq); const char *msg = knot_strerror(ret); knot_lookup_table_t *xd = knot_lookup_by_id(xfr_result_table, rq->type); if (xd && ret == KNOT_EOK) { msg = xd->name; } - int bootstrap_fail = 0; switch(rq->type) { case XFR_TYPE_AIN: case XFR_TYPE_IIN: @@ -480,36 +543,28 @@ static int xfr_task_process(xfrworker_t *w, knot_ns_xfr_t* rq, uint8_t *buf, siz pthread_mutex_lock(&zd->lock); zd->xfr_in.state = XFR_IDLE; pthread_mutex_unlock(&zd->lock); - bootstrap_fail = !knot_zone_contents(rq->zone); } break; - case XFR_TYPE_NOTIFY: /* Send on first timeout <0,5>s. */ - fdset_set_watchdog(w->pool.fds, rq->session, (int)(tls_rand() * 5)); - xfr_task_set(w, rq->session, rq); + case XFR_TYPE_NOTIFY: + /* This is a bit of a hack to adapt NOTIFY lifetime tracking. + * When NOTIFY event enters handler, it shouldn't be sent immediately. + * To accomodate for this, <0, 5>s random delay is set on + * event startup, so the first query fires when this timer + * expires. */ + fdset_set_watchdog(set, id, (int)(tls_rand() * 5)); return KNOT_EOK; case XFR_TYPE_SOA: case XFR_TYPE_FORWARD: - fdset_set_watchdog(w->pool.fds, rq->session, conf()->max_conn_reply); + fdset_set_watchdog(set, id, conf()->max_conn_reply); break; default: break; } if (ret == KNOT_EOK) { - xfr_task_set(w, rq->session, rq); - log_server_info("%s %s.\n", rq->msg, msg); - } else if (bootstrap_fail){ - int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand(); - event_t *ev = zd->xfr_in.timer; - if (ev) { - evsched_cancel(ev->parent, ev); - evsched_schedule(ev->parent, ev, tmr_s); - } - log_zone_notice("%s Bootstrap failed, next " - "attempt in %d seconds.\n", - rq->msg, tmr_s / 1000); + log_server_info("%s %s\n", rq->msg, msg); } else { - log_server_error("%s %s.\n", rq->msg, msg); + log_server_error("%s %s\n", rq->msg, msg); } return ret; @@ -745,16 +800,11 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq) /* Only for successful xfers. */ if (ret > 0) { ret = xfr_task_finalize(w, rq); - if (ret != KNOT_EOK && !knot_zone_contents(rq->zone)) { - - /* AXFR bootstrap timeout. */ - zonedata_t *zd = (zonedata_t *)knot_zone_data(rq->zone); - int tmr_s = AXFR_BOOTSTRAP_RETRY * tls_rand(); - zd->xfr_in.bootstrap_retry = tmr_s; - log_zone_info("%s Next attempt to bootstrap " - "in %d seconds.\n", - rq->msg, tmr_s / 1000); - } else if (ret == KNOT_EBUSY && rq->type == XFR_TYPE_IIN) { + + /* EBUSY on incremental transfer has a special meaning and + * is caused by a journal not able to free up space for incoming + * transfer, thus forcing to start a new full zone transfer. */ + if (ret == KNOT_EBUSY && rq->type == XFR_TYPE_IIN) { return xfr_start_axfr(w, rq, diff_nospace_msg); } else { @@ -763,7 +813,7 @@ static int xfr_task_xfer(xfrworker_t *w, knot_ns_xfr_t *rq) } /* Update REFRESH/RETRY */ - zones_schedule_refresh(rq->zone, 0); + zones_schedule_refresh(rq->zone, REFRESH_DEFAULT); ret = KNOT_ECONNREFUSED; /* Disconnect */ } @@ -801,37 +851,36 @@ static int xfr_process_event(xfrworker_t *w, knot_ns_xfr_t *rq) } } -/*! \brief Sweep non-replied connection. */ -static void xfr_sweep(fdset_t *set, int fd, void *data) +/*! \brief Sweep inactive connection. */ +static enum fdset_sweep_state xfr_sweep(fdset_t *set, int i, void *data) { - dbg_xfr("xfr: sweeping fd=%d\n", fd); - if (!set || !data) { - dbg_xfr("xfr: invalid sweep operation on NULL worker or set\n"); - return; - } - xfrworker_t *w = (xfrworker_t *)data; - knot_ns_xfr_t *rq = xfr_task_get(w, fd); - if (!rq) { - dbg_xfr("xfr: NULL data to sweep\n"); - return; - } + assert(set && i < set->n && i >= 0); + + knot_ns_xfr_t *rq = set->ctx[i]; + xfrhandler_t *xfr = (xfrhandler_t *)data; - /* Skip non-sweepable types. */ + /* Expire only UDP requests. */ int ret = KNOT_ECONNREFUSED; switch(rq->type) { case XFR_TYPE_SOA: case XFR_TYPE_NOTIFY: case XFR_TYPE_FORWARD: - ret = xfr_task_expire(set, rq); + ret = xfr_task_expire(set, i, rq); break; default: break; } + /* Close if not valid anymore. */ if (ret != KNOT_EOK) { - xfr_task_close(w, fd); - --w->pending; + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); + xfr_task_close(rq); + socket_close(set->pfd[i].fd); + return FDSET_SWEEP; } + + return FDSET_KEEP; } /*! \brief Check TSIG if exists. */ @@ -969,7 +1018,7 @@ int xfr_worker(dthread_t *thread) xfrhandler_t *xfr = w->master; /* Buffer for answering. */ - size_t buflen = XFR_BUFFER_SIZE; + size_t buflen = SOCKET_MTU_SZ; uint8_t* buf = malloc(buflen); if (buf == NULL) { dbg_xfr("xfr: failed to allocate buffer for XFR worker\n"); @@ -981,84 +1030,113 @@ int xfr_worker(dthread_t *thread) time_now(&next_sweep); next_sweep.tv_sec += XFR_SWEEP_INTERVAL; - int limit = XFR_CHUNKLEN * 3; - if (conf() && conf()->xfers > 0) { - limit = conf()->xfers; + /* Approximate thread capacity limits. */ + unsigned threads = w->master->unit->size; + unsigned thread_capacity = XFR_MAX_TASKS / threads; + + /* Set of connections. */ + fdset_t set; + int ret = fdset_init(&set, thread_capacity); + if (ret != KNOT_EOK) { + free(buf); + return ret; } - unsigned thread_capacity = limit / w->master->unit->size; - if (thread_capacity < 1) thread_capacity = 1; - w->pool.fds = fdset_new(); - w->pool.t = ahtable_create(); - w->pending = 0; /* Accept requests. */ - int ret = 0; dbg_xfr_verb("xfr: worker=%p starting\n", w); for (;;) { /* Populate pool with new requests. */ - if (w->pending <= thread_capacity) { - pthread_mutex_lock(&xfr->mx); - unsigned was_pending = w->pending; - while (!EMPTY_LIST(xfr->queue)) { - knot_ns_xfr_t *rq = HEAD(xfr->queue); - rem_node(&rq->n); + unsigned newconns = 0; + for (;;) { + /* Do not exceed thread capacity. */ + if (set.n >= thread_capacity || newconns > XFR_CHUNKLEN) + break; - /* Unlock queue and process request. */ + /* Tak first request. */ + pthread_mutex_lock(&xfr->mx); + if (EMPTY_LIST(xfr->queue)) { pthread_mutex_unlock(&xfr->mx); - ret = xfr_task_process(w, rq, buf, buflen); - if (ret == KNOT_EOK) ++w->pending; - else xfr_task_free(rq); - pthread_mutex_lock(&xfr->mx); + break; + } - if (w->pending - was_pending > XFR_CHUNKLEN) - break; + + /* Limit number of transfers. */ + knot_ns_xfr_t *rq = HEAD(xfr->queue); + unsigned is_transfer = xfr_task_is_transfer(rq); + if (is_transfer && !xfr_pending_incr(xfr)) { + pthread_mutex_unlock(&xfr->mx); + break; } + + rem_node(&rq->n); pthread_mutex_unlock(&xfr->mx); + + /* Start asynchronous connect. */ + xfr_async_setbuf(rq, buf, buflen); + if (xfr_async_start(&set, rq) != KNOT_EOK) { + if (is_transfer) + xfr_pending_decr(xfr); + xfr_task_close(rq); + break; + } + + ++newconns; } /* Check pending threads. */ - if (dt_is_cancelled(thread) || w->pending == 0) { + if (dt_is_cancelled(thread) || set.n == 0) { break; } /* Poll fdset. */ - int nfds = fdset_wait(w->pool.fds, (XFR_SWEEP_INTERVAL/2) * 1000); + int nfds = poll(set.pfd, set.n, XFR_SWEEP_INTERVAL * 1000); if (nfds < 0) { - if (errno == EINTR) continue; + if (errno == EINTR) + continue; break; } /* Iterate fdset. */ - fdset_it_t it; - fdset_begin(w->pool.fds, &it); - while(nfds > 0) { - - /* Find data. */ - knot_ns_xfr_t *rq = xfr_task_get(w, it.fd); - dbg_xfr_verb("xfr: worker=%p processing event on " - "fd=%d data=%p.\n", - w, it.fd, rq); - if (rq) { + unsigned i = 0; + while (nfds > 0 && i < set.n && !dt_is_cancelled(thread)) { + + if (!(set.pfd[i].revents & set.pfd[i].events)) { + /* Skip inactive. */ + ++i; + continue; + } else { + /* One less active event. */ + --nfds; + } + + /* Process pending tasks. */ + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i]; + if (rq->flags & XFR_FLAG_CONNECTING) { + ret = xfr_async_finish(&set, i); + } else { ret = xfr_process_event(w, rq); - if (ret != KNOT_EOK) { - xfr_task_close(w, it.fd); - --w->pending; - --it.pos; /* Reset iterator */ - } } - /* Next fd. */ - if (fdset_next(w->pool.fds, &it) < 0) { - break; + /* Check task state. */ + if (ret != KNOT_EOK) { + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); + xfr_task_close(rq); + socket_close(set.pfd[i].fd); + fdset_remove(&set, i); + continue; /* Stay on the same index. */ } + + /* Next active. */ + ++i; } /* Sweep inactive. */ timev_t now; if (time_now(&now) == 0) { if (now.tv_sec >= next_sweep.tv_sec) { - fdset_sweep(w->pool.fds, &xfr_sweep, w); + fdset_sweep(&set, &xfr_sweep, xfr); memcpy(&next_sweep, &now, sizeof(next_sweep)); next_sweep.tv_sec += XFR_SWEEP_INTERVAL; } @@ -1066,22 +1144,17 @@ int xfr_worker(dthread_t *thread) } /* Cancel existing connections. */ - size_t keylen = 0; - ahtable_iter_t i; - ahtable_iter_begin(w->pool.t, &i, false); - while (!ahtable_iter_finished(&i)) { - int *key = (int *)ahtable_iter_key(&i, &keylen); - xfr_task_close(w, *key); - ahtable_iter_next(&i); - } - ahtable_iter_free(&i); - - /* Destroy data structures. */ - fdset_destroy(w->pool.fds); - ahtable_free(w->pool.t); - free(buf); + for (unsigned i = 0; i < set.n; ++i) { + knot_ns_xfr_t *rq = (knot_ns_xfr_t *)set.ctx[i]; + socket_close(set.pfd[i].fd); + if (xfr_task_is_transfer(rq)) + xfr_pending_decr(xfr); + xfr_task_free(rq); + } dbg_xfr_verb("xfr: worker=%p finished.\n", w); + fdset_clear(&set); + free(buf); return KNOT_EOK; } @@ -1115,6 +1188,7 @@ xfrhandler_t *xfr_create(size_t thrcount, knot_nameserver_t *ns) /* Create tasks structure and mutex. */ pthread_mutex_init(&xfr->mx, 0); + pthread_mutex_init(&xfr->pending_mx, 0); init_list(&xfr->queue); /* Assign worker threads. */ @@ -1133,6 +1207,7 @@ int xfr_free(xfrhandler_t *xfr) } /* Free RR mutex. */ + pthread_mutex_destroy(&xfr->pending_mx); pthread_mutex_destroy(&xfr->mx); /* Free pending queue. */ |