diff options
Diffstat (limited to 'usr/src/lib/varpd')
38 files changed, 5092 insertions, 31 deletions
diff --git a/usr/src/lib/varpd/Makefile b/usr/src/lib/varpd/Makefile index 0962119d1c..7c78fb3145 100644 --- a/usr/src/lib/varpd/Makefile +++ b/usr/src/lib/varpd/Makefile @@ -10,10 +10,10 @@ # # -# Copyright 2018 Joyent, Inc. +# Copyright 2021 Joyent, Inc. # -SUBDIRS = libvarpd direct files +SUBDIRS = libvarpd direct files svp all := TARGET = all clean := TARGET = clean diff --git a/usr/src/lib/varpd/direct/Makefile.com b/usr/src/lib/varpd/direct/Makefile.com index 4e8564bae0..9d62140620 100644 --- a/usr/src/lib/varpd/direct/Makefile.com +++ b/usr/src/lib/varpd/direct/Makefile.com @@ -10,7 +10,7 @@ # # -# Copyright 2015 Joyent, Inc. +# Copyright 2021 Joyent, Inc. # LIBRARY = libvarpd_direct.a diff --git a/usr/src/lib/varpd/direct/common/llib-lvarpd_direct b/usr/src/lib/varpd/direct/common/llib-lvarpd_direct new file mode 100644 index 0000000000..03c34f4fcb --- /dev/null +++ b/usr/src/lib/varpd/direct/common/llib-lvarpd_direct @@ -0,0 +1,18 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* LINTLIBRARY */ +/* PROTOLIB1 */ + diff --git a/usr/src/lib/varpd/direct/sparc/Makefile b/usr/src/lib/varpd/direct/sparc/Makefile new file mode 100644 index 0000000000..f2b4f63da5 --- /dev/null +++ b/usr/src/lib/varpd/direct/sparc/Makefile @@ -0,0 +1,18 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com + +install: all $(ROOTLIBS) $(ROOTLINKS) $(ROOTLINT) diff --git a/usr/src/lib/varpd/direct/sparcv9/Makefile b/usr/src/lib/varpd/direct/sparcv9/Makefile new file mode 100644 index 0000000000..d552642882 --- /dev/null +++ b/usr/src/lib/varpd/direct/sparcv9/Makefile @@ -0,0 +1,19 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com +include ../../../Makefile.lib.64 + +install: all $(ROOTLIBS64) $(ROOTLINKS64) $(ROOTLINT64) diff --git a/usr/src/lib/varpd/files/Makefile b/usr/src/lib/varpd/files/Makefile index 511ea1f94d..6253d3c242 100644 --- a/usr/src/lib/varpd/files/Makefile +++ b/usr/src/lib/varpd/files/Makefile @@ -10,7 +10,7 @@ # # -# Copyright 2015 Joyent, Inc. +# Copyright 2021 Joyent, Inc. # include ../../Makefile.lib diff --git a/usr/src/lib/varpd/files/Makefile.com b/usr/src/lib/varpd/files/Makefile.com index 13ff2149ce..dc24395673 100644 --- a/usr/src/lib/varpd/files/Makefile.com +++ b/usr/src/lib/varpd/files/Makefile.com @@ -10,7 +10,7 @@ # # -# Copyright 2018 Joyent, Inc. +# Copyright 2020 Joyent, Inc. # LIBRARY = libvarpd_files.a diff --git a/usr/src/lib/varpd/files/common/llib-lvarpd_files b/usr/src/lib/varpd/files/common/llib-lvarpd_files new file mode 100644 index 0000000000..03c34f4fcb --- /dev/null +++ b/usr/src/lib/varpd/files/common/llib-lvarpd_files @@ -0,0 +1,18 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* LINTLIBRARY */ +/* PROTOLIB1 */ + diff --git a/usr/src/lib/varpd/files/sparc/Makefile b/usr/src/lib/varpd/files/sparc/Makefile new file mode 100644 index 0000000000..f2b4f63da5 --- /dev/null +++ b/usr/src/lib/varpd/files/sparc/Makefile @@ -0,0 +1,18 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com + +install: all $(ROOTLIBS) $(ROOTLINKS) $(ROOTLINT) diff --git a/usr/src/lib/varpd/files/sparcv9/Makefile b/usr/src/lib/varpd/files/sparcv9/Makefile new file mode 100644 index 0000000000..d552642882 --- /dev/null +++ b/usr/src/lib/varpd/files/sparcv9/Makefile @@ -0,0 +1,19 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com +include ../../../Makefile.lib.64 + +install: all $(ROOTLIBS64) $(ROOTLINKS64) $(ROOTLINT64) diff --git a/usr/src/lib/varpd/libvarpd/Makefile b/usr/src/lib/varpd/libvarpd/Makefile index 034ba30c1d..2dbc8d7c5d 100644 --- a/usr/src/lib/varpd/libvarpd/Makefile +++ b/usr/src/lib/varpd/libvarpd/Makefile @@ -10,7 +10,7 @@ # # -# Copyright 2015 Joyent, Inc. +# Copyright 2021 Joyent, Inc. # include ../../Makefile.lib diff --git a/usr/src/lib/varpd/libvarpd/Makefile.com b/usr/src/lib/varpd/libvarpd/Makefile.com index dc51193367..7f45bd30e2 100644 --- a/usr/src/lib/varpd/libvarpd/Makefile.com +++ b/usr/src/lib/varpd/libvarpd/Makefile.com @@ -10,7 +10,7 @@ # # -# Copyright 2015 Joyent, Inc. +# Copyright 2021 Joyent, Inc. # LIBRARY = libvarpd.a @@ -32,7 +32,8 @@ include ../../../Makefile.lib include ../../../Makefile.rootfs LIBS = $(DYNLIB) -LDLIBS += -lc -lavl -lumem -lidspace -lnvpair -lmd -lrename +LDLIBS += -lc -lavl -lumem -lidspace -lnvpair -lmd -lrename \ + -lbunyan CPPFLAGS += -I../common CERRWARN += -erroff=E_STRUCT_DERIVED_FROM_FLEX_MBR diff --git a/usr/src/lib/varpd/libvarpd/common/libvarpd.c b/usr/src/lib/varpd/libvarpd/common/libvarpd.c index 4e4c189a43..e4460089cc 100644 --- a/usr/src/lib/varpd/libvarpd/common/libvarpd.c +++ b/usr/src/lib/varpd/libvarpd/common/libvarpd.c @@ -98,6 +98,14 @@ libvarpd_create(varpd_handle_t **vphp) return (ret); } + if ((ret = bunyan_init("varpd", &vip->vdi_bunyan)) != 0) { + libvarpd_overlay_fini(vip); + umem_cache_destroy(vip->vdi_qcache); + id_space_destroy(vip->vdi_idspace); + umem_free(vip, sizeof (varpd_impl_t)); + return (ret); + } + libvarpd_persist_init(vip); avl_create(&vip->vdi_plugins, libvarpd_plugin_comparator, @@ -315,6 +323,13 @@ out: return (ret); } +const bunyan_logger_t * +libvarpd_plugin_bunyan(varpd_provider_handle_t *vhp) +{ + varpd_instance_t *inst = (varpd_instance_t *)vhp; + return (inst->vri_impl->vdi_bunyan); +} + static void libvarpd_prefork(void) { diff --git a/usr/src/lib/varpd/libvarpd/common/libvarpd_impl.h b/usr/src/lib/varpd/libvarpd/common/libvarpd_impl.h index f8530a7112..60f0dc5fff 100644 --- a/usr/src/lib/varpd/libvarpd/common/libvarpd_impl.h +++ b/usr/src/lib/varpd/libvarpd/common/libvarpd_impl.h @@ -53,6 +53,7 @@ typedef struct varpd_impl { avl_tree_t vdi_linstances; /* vdi_lock */ id_space_t *vdi_idspace; /* RO */ umem_cache_t *vdi_qcache; /* RO */ + bunyan_logger_t *vdi_bunyan; /* RO */ int vdi_overlayfd; /* RO */ int vdi_doorfd; /* vdi_lock */ int vdi_persistfd; /* vdi_plock */ diff --git a/usr/src/lib/varpd/libvarpd/common/libvarpd_overlay.c b/usr/src/lib/varpd/libvarpd/common/libvarpd_overlay.c index 167c004a90..f314440056 100644 --- a/usr/src/lib/varpd/libvarpd/common/libvarpd_overlay.c +++ b/usr/src/lib/varpd/libvarpd/common/libvarpd_overlay.c @@ -10,7 +10,7 @@ */ /* - * Copyright 2015 Joyent, Inc. + * Copyright 2021 Joyent, Inc. */ /* diff --git a/usr/src/lib/varpd/libvarpd/common/libvarpd_plugin.c b/usr/src/lib/varpd/libvarpd/common/libvarpd_plugin.c index 176306a3f7..ac73286fdd 100644 --- a/usr/src/lib/varpd/libvarpd/common/libvarpd_plugin.c +++ b/usr/src/lib/varpd/libvarpd/common/libvarpd_plugin.c @@ -25,6 +25,7 @@ #include <dlfcn.h> #include <link.h> #include <stdio.h> +#include <bunyan.h> static varpd_impl_t *varpd_load_handle; static const char *varpd_load_path; @@ -58,18 +59,21 @@ libvarpd_plugin_alloc(uint_t version, int *errp) errp = &err; if (version != VARPD_VERSION_ONE) { - (void) fprintf(stderr, - "unsupported registration version %u - %s\n", - version, varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "unsupported registration version", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_INT32, "module_version", version, + BUNYAN_T_END); *errp = EINVAL; return (NULL); } vprp = umem_alloc(sizeof (varpd_plugin_register_t), UMEM_DEFAULT); if (vprp == NULL) { - (void) fprintf(stderr, - "failed to allocate registration handle - %s\n", - varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "failed to allocate registration handle", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_END); *errp = ENOMEM; return (NULL); } @@ -93,17 +97,20 @@ libvarpd_plugin_register(varpd_plugin_register_t *vprp) vpp = umem_alloc(sizeof (varpd_plugin_t), UMEM_DEFAULT); if (vpp == NULL) { - (void) fprintf(stderr, - "failed to allocate memory for the varpd_plugin_t - %s\n", - varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "failed to allocate memory for the varpd_plugin_t", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_END); return (ENOMEM); } /* Watch out for an evil plugin */ if (vprp->vpr_version != VARPD_VERSION_ONE) { - (void) fprintf(stderr, - "unsupported registration version %u - %s\n", - vprp->vpr_version, varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "unsupported registration version", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_INT32, "module_version", vprp->vpr_version, + BUNYAN_T_END); return (EINVAL); } @@ -114,9 +121,11 @@ libvarpd_plugin_register(varpd_plugin_register_t *vprp) mutex_enter(&varpd_load_handle->vdi_lock); lookup.vpp_name = vprp->vpr_name; if (avl_find(&varpd_load_handle->vdi_plugins, &lookup, NULL) != NULL) { - (void) fprintf(stderr, - "module already exists with requested name '%s' - %s\n", - vprp->vpr_name, varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "module already exists with requested name", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_STRING, "name", vprp->vpr_name, + BUNYAN_T_END); mutex_exit(&varpd_load_handle->vdi_lock); mutex_exit(&varpd_load_lock); umem_free(vpp, sizeof (varpd_plugin_t)); @@ -124,9 +133,10 @@ libvarpd_plugin_register(varpd_plugin_register_t *vprp) } vpp->vpp_name = strdup(vprp->vpr_name); if (vpp->vpp_name == NULL) { - (void) fprintf(stderr, - "failed to allocate memory to duplicate name - %s\n", - varpd_load_path); + (void) bunyan_warn(varpd_load_handle->vdi_bunyan, + "failed to allocate memory to duplicate name", + BUNYAN_T_STRING, "module_path", varpd_load_path, + BUNYAN_T_END); mutex_exit(&varpd_load_handle->vdi_lock); mutex_exit(&varpd_load_lock); umem_free(vpp, sizeof (varpd_plugin_t)); @@ -167,8 +177,11 @@ libvarpd_plugin_load_cb(varpd_impl_t *vip, const char *path, void *unused) varpd_load_path = path; dlp = dlopen(path, RTLD_LOCAL | RTLD_NOW); - if (dlp == NULL) - (void) fprintf(stderr, "dlopen failed - %s\n", path); + if (dlp == NULL) { + (void) bunyan_error(vip->vdi_bunyan, "dlopen failed", + BUNYAN_T_STRING, "module path", path, + BUNYAN_T_END); + } path = NULL; return (0); diff --git a/usr/src/lib/varpd/libvarpd/common/libvarpd_provider.h b/usr/src/lib/varpd/libvarpd/common/libvarpd_provider.h index b6910b9ed5..050b9e600a 100644 --- a/usr/src/lib/varpd/libvarpd/common/libvarpd_provider.h +++ b/usr/src/lib/varpd/libvarpd/common/libvarpd_provider.h @@ -36,8 +36,8 @@ * succeeds, then it should proceed to fill out the registration and then call, * libvarpd_plugin_register() with it. Regardless of whether it succeeds or * fails, it should call libvarpd_plugin_free(). In the case of failure, there - * is not much that the module should do, other than log some message to - * stderr. + * is not much that the module should do, other than log some message to the + * standard bunyan logger that exists. * * Once libvarpd_plugin_register() returns, the module should assume that any * of the operations it defined in the operation vector may be called and @@ -287,6 +287,7 @@ * context, including from the operation vectors. */ +#include <bunyan.h> #include <libvarpd.h> #include <libnvpair.h> #include <sys/socket.h> @@ -366,6 +367,7 @@ extern int libvarpd_plugin_register(varpd_plugin_register_t *); * Blowing up and logging */ extern void libvarpd_panic(const char *, ...) __NORETURN; +extern const bunyan_logger_t *libvarpd_plugin_bunyan(varpd_provider_handle_t *); /* * Misc. Information APIs diff --git a/usr/src/lib/varpd/libvarpd/common/llib-lvarpd b/usr/src/lib/varpd/libvarpd/common/llib-lvarpd new file mode 100644 index 0000000000..85150d3463 --- /dev/null +++ b/usr/src/lib/varpd/libvarpd/common/llib-lvarpd @@ -0,0 +1,19 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* LINTLIBRARY */ +/* PROTOLIB1 */ + +#include <libvarpd.h> diff --git a/usr/src/lib/varpd/libvarpd/sparc/Makefile b/usr/src/lib/varpd/libvarpd/sparc/Makefile new file mode 100644 index 0000000000..f2b4f63da5 --- /dev/null +++ b/usr/src/lib/varpd/libvarpd/sparc/Makefile @@ -0,0 +1,18 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com + +install: all $(ROOTLIBS) $(ROOTLINKS) $(ROOTLINT) diff --git a/usr/src/lib/varpd/libvarpd/sparcv9/Makefile b/usr/src/lib/varpd/libvarpd/sparcv9/Makefile new file mode 100644 index 0000000000..d552642882 --- /dev/null +++ b/usr/src/lib/varpd/libvarpd/sparcv9/Makefile @@ -0,0 +1,19 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com +include ../../../Makefile.lib.64 + +install: all $(ROOTLIBS64) $(ROOTLINKS64) $(ROOTLINT64) diff --git a/usr/src/lib/varpd/svp/Makefile b/usr/src/lib/varpd/svp/Makefile new file mode 100644 index 0000000000..48417965b4 --- /dev/null +++ b/usr/src/lib/varpd/svp/Makefile @@ -0,0 +1,42 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2018 Joyent, Inc. +# + +include ../../Makefile.lib + +HDRS = libvarpd_svp_prot.h # For snoop +HDRDIR = common +SUBDIRS = $(MACH) +$(BUILD64)SUBDIRS += $(MACH64) + +all := TARGET = all +clean := TARGET = clean +clobber := TARGET = clobber +install := TARGET = install + +.KEEP_STATE: + +all clean clobber install: $(SUBDIRS) + +install: install_h +install_h: $(ROOTHDRS) + +check: + +$(SUBDIRS): FRC + @cd $@; pwd; $(MAKE) $(TARGET) + +FRC: + +include ../../Makefile.targ diff --git a/usr/src/lib/varpd/svp/Makefile.com b/usr/src/lib/varpd/svp/Makefile.com new file mode 100644 index 0000000000..91baacb430 --- /dev/null +++ b/usr/src/lib/varpd/svp/Makefile.com @@ -0,0 +1,52 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2020 Joyent, Inc. +# + +LIBRARY = libvarpd_svp.a +VERS = .1 +OBJECTS = libvarpd_svp.o \ + libvarpd_svp_conn.o \ + libvarpd_svp_crc.o \ + libvarpd_svp_host.o \ + libvarpd_svp_loop.o \ + libvarpd_svp_remote.o \ + libvarpd_svp_shootdown.o \ + libvarpd_svp_timer.o + +include ../../../Makefile.lib +include ../../Makefile.plugin + +LIBS = $(DYNLIB) + +# +# Yes, this isn't a command, but libcmdutils does have the list(9F) +# functions and better to use that then compile list.o yet again +# ourselves... probably. +# +LDLIBS += -lc -lumem -lnvpair -lsocket -lavl \ + -lcmdutils -lidspace -lbunyan +CPPFLAGS += -I../common + +LINTFLAGS += -erroff=E_BAD_PTR_CAST_ALIGN +LINTFLAGS64 += -erroff=E_BAD_PTR_CAST_ALIGN +SRCDIR = ../common + +C99MODE= -xc99=%all +C99LMODE= -Xc99=%all + +.KEEP_STATE: + +all: $(LIBS) + +include ../../../Makefile.targ diff --git a/usr/src/lib/varpd/svp/amd64/Makefile b/usr/src/lib/varpd/svp/amd64/Makefile new file mode 100644 index 0000000000..d552642882 --- /dev/null +++ b/usr/src/lib/varpd/svp/amd64/Makefile @@ -0,0 +1,19 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com +include ../../../Makefile.lib.64 + +install: all $(ROOTLIBS64) $(ROOTLINKS64) $(ROOTLINT64) diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp.c b/usr/src/lib/varpd/svp/common/libvarpd_svp.c new file mode 100644 index 0000000000..2483cb6214 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp.c @@ -0,0 +1,1140 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015, Joyent, Inc. + * Copyright 2022 MNX Cloud, Inc. + */ + +/* + * This plugin implements the SDC VXLAN Protocol (SVP). + * + * This plugin is designed to work with a broader distributed system that + * mainains a database of mappings and provides a means of looking up data and + * provides a stream of updates. While it is named after VXLAN, there isn't + * anything specific to VXLAN baked into the protocol at this time, other than + * that it requires both an IP address and a port; however, if there's a good + * reason to support others here, we can modify that. + * + * ----------- + * Terminology + * ----------- + * + * Throughout this module we refer to a few different kinds of addresses: + * + * VL3 + * + * A VL3 address, or virtual layer 3, refers to the layer three addreses + * that are used by entities on an overlay network. As far as we're + * concerned that means that this is the IP address of an interface on an + * overlay network. + * + * VL2 + * + * A VL2 address, or a virtual layer 2, referes to the link-layer addresses + * that are used by entities on an overlay network. As far as we're + * concerned that means that this is the MAC addresses of an interface on + * an overlay network. + * + * UL3 + * + * A UL3, or underlay layer 3, refers to the layer three (IP) address on + * the underlay network. + * + * The svp plugin provides lookups from VL3->VL2, eg. the equivalent of an ARP + * or NDP query, and then also provides VL2->UL3 lookups. + * + * ------------------- + * Protocol Operations + * ------------------- + * + * The svp protocol is defined in lib/varpd/svp/common/libvarpd_svp_prot.h. It + * defines the basic TCP protocol that we use to communicate to hosts. At this + * time, it is not quite 100% implemented in both this plug-in and our primary + * server, sdc-portolan (see https://github.com/joyent/sdc-portolan). + * + * At this time, we don't quite support everything that we need to. Including + * the SVP_R_BULK_REQ and SVP_R_SHOOTDOWN. + * + * --------------------------------- + * General Design and Considerations + * --------------------------------- + * + * Every instance of the svp plugin requires the hostname and port of a server + * to contact. Though, we have co-opted the port 1296 (the year of the oldest + * extant portolan) as our default port. + * + * Each of the different instance of the plugins has a corresponding remote + * backend. The remote backend represents the tuple of the [ host, port ]. + * Different instances that share the same host and port tuple will use the same + * backend. + * + * The backend is actually in charge of performing lookups, resolving and + * updating the set of remote hosts based on the DNS resolution we've been + * provided, and taking care of things like shootdowns. + * + * The whole plugin itself maintains an event loop and a number of threads to + * service that event loop. On top of that event loop, we have a simple timer + * backend that ticks at one second intervals and performs various callbacks, + * such as idle query timers, DNS resolution, connection backoff, etc. Each of + * the remote hosts that we obtain is wrapped up in an svp_conn_t, which manages + * the connection state, reconnecting, etc. + * + * All in all, the general way that this all looks like is: + * + * +----------------------------+ + * | Plugin Instance | + * | svp_t | + * | | + * | varpd_provider_handle_t * -+-> varpd handle + * | uint64_t ----+-> varpd ID + * | char * ----+-> remote host + * | uint16_t ----+-> remote port + * | svp_remote_t * ---+------+-> remote backend + * +---------------------+------+ + * | + * v + * +----------------------+ +----------------+ + * | Remote backend |------------------>| Remove Backend |---> ... + * | svp_remote_t | | svp_remote_t | + * | | +----------------+ + * | svp_remote_state_t --+-> state flags + * | svp_degrade_state_t -+-> degraded reason + * | struct addrinfo * --+-> resolved hosts + * | uint_t ---+-> active hosts + * | uint_t ---+-> DNS generation + * | uint_t ---+-> Reference count + * | uint_t ---+-> active conns + * | uint_t ---+-> degraded conns + * | list_t ---+---+-> connection list + * +------------------+---+ + * | + * +------------------------------+-----------------+ + * | | | + * v v v + * +-------------------+ +---------------- + * | SVP Connection | | SVP connection | ... + * | svp_conn_t | | svp_conn_t | + * | | +----------------+ + * | svp_event_t ----+-> event loop handle + * | svp_timer_t ----+-> backoff timer + * | svp_timer_t ----+-> query timer + * | int ----+-> socket fd + * | uint_t ----+-> generation + * | uint_t ----+-> current backoff + * | svp_conn_flags_t -+-> connection flags + * | svp_conn_state_t -+-> connection state + * | svp_conn_error_t -+-> connection error + * | int ---+-> last errrno + * | hrtime_t ---+-> activity timestamp + * | svp_conn_out_t ---+-> outgoing data state + * | svp_conn_in_t ---+-> incoming data state + * | list_t ---+--+-> active queries + * +----------------+--+ + * | + * +----------------------------------+-----------------+ + * | | | + * v v v + * +--------------------+ +-------------+ + * | SVP Query | | SVP Query | ... + * | svp_query_t | | svp_query_t | + * | | +-------------+ + * | svp_query_f ---+-> callback function + * | void * ---+-> callback arg + * | svp_query_state_t -+-> state flags + * | svp_req_t ---+-> svp prot. header + * | svp_query_data_t --+-> read data + * | svp_query_data_t --+-> write data + * | svp_status_t ---+-> request status + * +--------------------+ + * + * The svp_t is the instance that we assoicate with varpd. The instance itself + * maintains properties and then when it's started associates with an + * svp_remote_t, which is the remote backend. The remote backend itself, + * maintains the DNS state and spins up and downs connections based on the + * results from DNS. By default, we query DNS every 30 seconds. For more on the + * connection life cycle, see the next section. + * + * By default, each connection maintains its own back off timer and list of + * queries it's servicing. Only one request is generally outstanding at a time + * and requests are round robined across the various connections. + * + * The query itself represents the svp request that's going on and keep track of + * its state and is a place for data that's read and written to as part of the + * request. + * + * Connections maintain a query timer such that if we have not received data on + * a socket for a certain amount of time, we kill that socket and begin a + * reconnection cycle with backoff. + * + * ------------------------ + * Connection State Machine + * ------------------------ + * + * We have a connection pool that's built upon DNS records. DNS describes the + * membership of the set of remote peers that make up our pool and we maintain + * one connection to each of them. In addition, we maintain an exponential + * backoff for each peer and will attempt to reconect immediately before backing + * off. The following are the valid states that a connection can be in: + * + * SVP_CS_ERROR An OS error has occurred on this connection, + * such as failure to create a socket or associate + * the socket with an event port. We also + * transition all connections to this state before + * we destroy them. + * + * SVP_CS_INITIAL This is the initial state of a connection, all + * that should exist is an unbound socket. + * + * SVP_CS_CONNECTING A call to connect has been made and we are + * polling for it to complete. + * + * SVP_CS_BACKOFF A connect attempt has failed and we are + * currently backing off, waiting to try again. + * + * SVP_CS_ACTIVE We have successfully connected to the remote + * system. + * + * SVP_CS_WINDDOWN This connection is going to valhalla. In other + * words, a previously active connection is no + * longer valid in DNS, so we should curb our use + * of it, and reap it as soon as we have other + * active connections. + * + * The following diagram attempts to describe our state transition scheme, and + * when we transition from one state to the next. + * + * | + * * New remote IP from DNS resolution, + * | not currently active in the system. + * | + * v Socket Error, + * +----------------+ still in DNS + * +----------------<---| SVP_CS_INITIAL |<----------------------*-----+ + * | +----------------+ | + * | System | | + * | Connection . . . . . success * Successful | + * | failed . | connect() | + * | +----*---------+ | +-----------*--+ | + * | | | | | | | + * | V ^ v ^ V ^ + * | +----------------+ +-------------------+ +---------------+ + * +<-| SVP_CS_BACKOFF | | SVP_CS_CONNECTING | | SVP_CS_ACTIVE | + * | +----------------+ +-------------------+ +---------------+ + * | V ^ V V V + * | Backoff wait * | | | * Removed + * v interval +--------------+ +-----------------<-----+ | from DNS + * | finished | | + * | V | + * | | V + * | | +-----------------+ + * +----------------+----------<-----+-------<----| SVP_CS_WINDDOWN | + * | +-----------------+ + * * . . . Fatal system, not + * | socket error or + * V quiesced after + * +--------------+ removal from DNS + * | SVP_CS_ERROR | + * +--------------+ + * | + * * . . . Removed from DNS + * v + * +------------+ + * | Connection | + * | Destroyed | + * +------------+ + * + * -------------------------- + * Connection Event Injection + * -------------------------- + * + * For each connection that exists in the system, we have a timer in place that + * is in charge of performing timeout activity. It fires once every thirty + * seconds or so for a given connection and checks to ensure that we have had + * activity for the most recent query on the connection. If not, it terminates + * the connection. This is important as if we have sent all our data and are + * waiting for the remote end to reply, without enabling something like TCP + * keep-alive, we will not be notified that anything that has happened to the + * remote connection, for example a panic. In addition, this also protects + * against a server that is up, but a portolan that is not making forward + * progress. + * + * When a timeout occurs, we first try to disassociate any active events, which + * by definition must exist. Once that's done, we inject a port source user + * event. Now, there is a small gotcha. Let's assume for a moment that we have a + * pathological portolan. That means that it knows to inject activity right at + * the time out window. That means, that the event may be disassociated before + * we could get to it. If that's the case, we must _not_ inject the user event + * and instead, we'll let the pending event take care of it. We know that the + * pending event hasn't hit the main part of the loop yet, otherwise, it would + * have released the lock protecting our state and associated the event. + * + * ------------ + * Notes on DNS + * ------------ + * + * Unfortunately, doing host name resolution in a way that allows us to leverage + * the system's resolvers and the system's caching, require us to make blocking + * calls in libc via getaddrinfo(3SOCKET). If we can't reach a given server, + * that will tie up a thread for quite some time. To work around that fact, + * we're going to create a fixed number of threads and we'll use them to service + * our DNS requests. While this isn't ideal, until we have a sane means of + * integrating a DNS resolution into an event loop with say portfs, it's not + * going to be a fun day no matter what we do. + * + * ------ + * Timers + * ------ + * + * We maintain a single timer based on CLOCK_REALTIME. It's designed to fire + * every second. While we'd rather use CLOCK_HIGHRES just to alleviate ourselves + * from timer drift; however, as zones may not actually have CLOCK_HIGHRES + * access, we don't want them to end up in there. The timer itself is just a + * simple avl tree sorted by expiration time, which is stored as a tick in the + * future, a tick is just one second. + * + * ---------- + * Shootdowns + * ---------- + * + * As part of the protocol, we need to be able to handle shootdowns that inform + * us some of the information in the system is out of date. This information + * needs to be processed promptly; however, the information is hopefully going + * to be relatively infrequent relative to the normal flow of information. + * + * The shoot down information needs to be done on a per-backend basis. The + * general design is that we'll have a single query for this which can fire on a + * 5-10s period, we randmoize the latter part to give us a bit more load + * spreading. If we complete because there's no work to do, then we wait the + * normal period. If we complete, but there's still work to do, we'll go again + * after a second. + * + * A shootdown has a few different parts. We first receive a list of items to + * shootdown. After performing all of those, we need to acknowledge them. When + * that's been done successfully, we can move onto the next part. From a + * protocol perspective, we make a SVP_R_LOG_REQ, we get a reply, and then after + * processing them, send an SVP_R_LOG_RM. Only once that's been acked do we + * continue. + * + * However, one of the challenges that we have is that these invalidations are + * just that, an invalidation. For a virtual layer two request, that's fine, + * because the kernel supports that. However, for virtual layer three + * invalidations, we have a bit more work to do. These protocols, ARP and NDP, + * don't really support a notion of just an invalidation, instead you have to + * inject the new data in a gratuitous fashion. + * + * To that end, what we instead do is when we receive a VL3 invalidation, we + * turn that info a VL3 request. We hold the general request as outstanding + * until we receive all of the callbacks for the VL3 invalidations, at which + * point we go through and do the log removal request. + */ + +#include <umem.h> +#include <errno.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <libnvpair.h> +#include <strings.h> +#include <string.h> +#include <assert.h> +#include <unistd.h> + +#include <libvarpd_provider.h> +#include "libvarpd_svp.h" + +bunyan_logger_t *svp_bunyan; +static int svp_defport = 1296; +static int svp_defuport = 1339; +static umem_cache_t *svp_lookup_cache; + +typedef enum svp_lookup_type { + SVP_L_UNKNOWN = 0x0, + SVP_L_VL2 = 0x1, + SVP_L_VL3 = 0x2 +} svp_lookup_type_t; + +typedef struct svp_lookup { + int svl_type; + union { + struct svl_lookup_vl2 { + varpd_query_handle_t *svl_handle; + overlay_target_point_t *svl_point; + } svl_vl2; + struct svl_lookup_vl3 { + varpd_arp_handle_t *svl_vah; + uint8_t *svl_out; + } svl_vl3; + } svl_u; + svp_query_t svl_query; +} svp_lookup_t; + +static const char *varpd_svp_props[] = { + "svp/host", + "svp/port", + "svp/underlay_ip", + "svp/underlay_port" +}; + +static const uint8_t svp_bcast[6] = { 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }; + +int +svp_comparator(const void *l, const void *r) +{ + const svp_t *ls = l; + const svp_t *rs = r; + + if (ls->svp_vid > rs->svp_vid) + return (1); + if (ls->svp_vid < rs->svp_vid) + return (-1); + return (0); +} + +static void +svp_vl2_lookup_cb(svp_t *svp, svp_status_t status, const struct in6_addr *uip, + const uint16_t uport, void *arg) +{ + svp_lookup_t *svl = arg; + overlay_target_point_t *otp; + + assert(svp != NULL); + assert(arg != NULL); + + if (status != SVP_S_OK) { + libvarpd_plugin_query_reply(svl->svl_u.svl_vl2.svl_handle, + VARPD_LOOKUP_DROP); + umem_cache_free(svp_lookup_cache, svl); + return; + } + + otp = svl->svl_u.svl_vl2.svl_point; + bcopy(uip, &otp->otp_ip, sizeof (struct in6_addr)); + otp->otp_port = uport; + libvarpd_plugin_query_reply(svl->svl_u.svl_vl2.svl_handle, + VARPD_LOOKUP_OK); + umem_cache_free(svp_lookup_cache, svl); +} + +static void +svp_vl3_lookup_cb(svp_t *svp, svp_status_t status, const uint8_t *vl2mac, + const struct in6_addr *uip, const uint16_t uport, void *arg) +{ + overlay_target_point_t point; + svp_lookup_t *svl = arg; + + assert(svp != NULL); + assert(svl != NULL); + + if (status != SVP_S_OK) { + libvarpd_plugin_arp_reply(svl->svl_u.svl_vl3.svl_vah, + VARPD_LOOKUP_DROP); + umem_cache_free(svp_lookup_cache, svl); + return; + } + + /* Inject the L2 mapping before the L3 */ + bcopy(uip, &point.otp_ip, sizeof (struct in6_addr)); + point.otp_port = uport; + libvarpd_inject_varp(svp->svp_hdl, vl2mac, &point); + + bcopy(vl2mac, svl->svl_u.svl_vl3.svl_out, ETHERADDRL); + libvarpd_plugin_arp_reply(svl->svl_u.svl_vl3.svl_vah, + VARPD_LOOKUP_OK); + umem_cache_free(svp_lookup_cache, svl); +} + +static void +svp_vl2_invalidate_cb(svp_t *svp, const uint8_t *vl2mac) +{ + libvarpd_inject_varp(svp->svp_hdl, vl2mac, NULL); +} + +static void +svp_vl3_inject_cb(svp_t *svp, const uint16_t vlan, const struct in6_addr *vl3ip, + const uint8_t *vl2mac, const uint8_t *targmac) +{ + struct in_addr v4; + + /* + * At the moment we don't support any IPv6 related log entries, this + * will change soon as we develop a bit more of the IPv6 related + * infrastructure so we can properly test the injection. + */ + if (IN6_IS_ADDR_V4MAPPED(vl3ip) == 0) { + return; + } else { + IN6_V4MAPPED_TO_INADDR(vl3ip, &v4); + if (targmac == NULL) + targmac = svp_bcast; + libvarpd_inject_arp(svp->svp_hdl, vlan, vl2mac, &v4, targmac); + } +} + +/* ARGSUSED */ +static void +svp_shootdown_cb(svp_t *svp, const uint8_t *vl2mac, const struct in6_addr *uip, + const uint16_t uport) +{ + /* + * We should probably do a conditional invlaidation here. + */ + libvarpd_inject_varp(svp->svp_hdl, vl2mac, NULL); +} + +static svp_cb_t svp_defops = { + svp_vl2_lookup_cb, + svp_vl3_lookup_cb, + svp_vl2_invalidate_cb, + svp_vl3_inject_cb, + svp_shootdown_cb +}; + +static boolean_t +varpd_svp_valid_dest(overlay_plugin_dest_t dest) +{ + if (dest != (OVERLAY_PLUGIN_D_IP | OVERLAY_PLUGIN_D_PORT)) + return (B_FALSE); + + return (B_TRUE); +} + +static int +varpd_svp_create(varpd_provider_handle_t *hdl, void **outp, + overlay_plugin_dest_t dest) +{ + int ret; + svp_t *svp; + + if (varpd_svp_valid_dest(dest) == B_FALSE) + return (ENOTSUP); + + svp = umem_zalloc(sizeof (svp_t), UMEM_DEFAULT); + if (svp == NULL) + return (ENOMEM); + + if ((ret = mutex_init(&svp->svp_lock, USYNC_THREAD | LOCK_ERRORCHECK, + NULL)) != 0) { + umem_free(svp, sizeof (svp_t)); + return (ret); + } + + svp->svp_port = svp_defport; + svp->svp_uport = svp_defuport; + svp->svp_cb = svp_defops; + svp->svp_hdl = hdl; + svp->svp_vid = libvarpd_plugin_vnetid(svp->svp_hdl); + *outp = svp; + return (0); +} + +static int +varpd_svp_start(void *arg) +{ + int ret; + svp_remote_t *srp; + svp_t *svp = arg; + + mutex_enter(&svp->svp_lock); + if (svp->svp_host == NULL || svp->svp_port == 0 || + svp->svp_huip == B_FALSE || svp->svp_uport == 0) { + mutex_exit(&svp->svp_lock); + return (EAGAIN); + } + mutex_exit(&svp->svp_lock); + + if ((ret = svp_remote_find(svp->svp_host, svp->svp_port, &svp->svp_uip, + &srp)) != 0) + return (ret); + + if ((ret = svp_remote_attach(srp, svp)) != 0) { + svp_remote_release(srp); + return (ret); + } + + return (0); +} + +static void +varpd_svp_stop(void *arg) +{ + svp_t *svp = arg; + + svp_remote_detach(svp); +} + +static void +varpd_svp_destroy(void *arg) +{ + svp_t *svp = arg; + + if (svp->svp_host != NULL) + umem_free(svp->svp_host, strlen(svp->svp_host) + 1); + + if (mutex_destroy(&svp->svp_lock) != 0) + libvarpd_panic("failed to destroy svp_t`svp_lock"); + + umem_free(svp, sizeof (svp_t)); +} + +static void +varpd_svp_lookup(void *arg, varpd_query_handle_t *vqh, + const overlay_targ_lookup_t *otl, overlay_target_point_t *otp) +{ + svp_lookup_t *slp; + svp_t *svp = arg; + + /* + * Check if this is something that we need to proxy, eg. arp or ndp. + */ + if (otl->otl_sap == ETHERTYPE_ARP) { + libvarpd_plugin_proxy_arp(svp->svp_hdl, vqh, otl); + return; + } + + if (otl->otl_dstaddr[0] == 0x33 && + otl->otl_dstaddr[1] == 0x33) { + if (otl->otl_sap == ETHERTYPE_IPV6) { + libvarpd_plugin_proxy_ndp(svp->svp_hdl, vqh, otl); + } else { + libvarpd_plugin_query_reply(vqh, VARPD_LOOKUP_DROP); + } + return; + } + + /* + * Watch out for various multicast and broadcast addresses. We've + * already taken care of the IPv6 range above. Now we just need to + * handle broadcast and if the multicast bit is set, lowest bit of the + * first octet of the MAC, then we drop it now. + */ + if (bcmp(otl->otl_dstaddr, svp_bcast, ETHERADDRL) == 0 || + (otl->otl_dstaddr[0] & 0x01) == 0x01) { + libvarpd_plugin_query_reply(vqh, VARPD_LOOKUP_DROP); + return; + } + + /* + * If we have a failure to allocate memory for this, that's not good. + * However, telling the kernel to just drop this packet is much better + * than the alternative at this moment. At least we'll try again and we + * may have something more available to us in a little bit. + */ + slp = umem_cache_alloc(svp_lookup_cache, UMEM_DEFAULT); + if (slp == NULL) { + libvarpd_plugin_query_reply(vqh, VARPD_LOOKUP_DROP); + return; + } + + slp->svl_type = SVP_L_VL2; + slp->svl_u.svl_vl2.svl_handle = vqh; + slp->svl_u.svl_vl2.svl_point = otp; + + svp_remote_vl2_lookup(svp, &slp->svl_query, otl->otl_dstaddr, slp); +} + +/* ARGSUSED */ +static int +varpd_svp_nprops(void *arg, uint_t *nprops) +{ + *nprops = sizeof (varpd_svp_props) / sizeof (char *); + return (0); +} + +/* ARGSUSED */ +static int +varpd_svp_propinfo(void *arg, uint_t propid, varpd_prop_handle_t *vph) +{ + switch (propid) { + case 0: + /* svp/host */ + libvarpd_prop_set_name(vph, varpd_svp_props[0]); + libvarpd_prop_set_prot(vph, OVERLAY_PROP_PERM_RRW); + libvarpd_prop_set_type(vph, OVERLAY_PROP_T_STRING); + libvarpd_prop_set_nodefault(vph); + break; + case 1: + /* svp/port */ + libvarpd_prop_set_name(vph, varpd_svp_props[1]); + libvarpd_prop_set_prot(vph, OVERLAY_PROP_PERM_RRW); + libvarpd_prop_set_type(vph, OVERLAY_PROP_T_UINT); + (void) libvarpd_prop_set_default(vph, &svp_defport, + sizeof (svp_defport)); + libvarpd_prop_set_range_uint32(vph, 1, UINT16_MAX); + break; + case 2: + /* svp/underlay_ip */ + libvarpd_prop_set_name(vph, varpd_svp_props[2]); + libvarpd_prop_set_prot(vph, OVERLAY_PROP_PERM_RRW); + libvarpd_prop_set_type(vph, OVERLAY_PROP_T_IP); + libvarpd_prop_set_nodefault(vph); + break; + case 3: + /* svp/underlay_port */ + libvarpd_prop_set_name(vph, varpd_svp_props[3]); + libvarpd_prop_set_prot(vph, OVERLAY_PROP_PERM_RRW); + libvarpd_prop_set_type(vph, OVERLAY_PROP_T_UINT); + (void) libvarpd_prop_set_default(vph, &svp_defuport, + sizeof (svp_defuport)); + libvarpd_prop_set_range_uint32(vph, 1, UINT16_MAX); + break; + default: + return (EINVAL); + } + return (0); +} + +static int +varpd_svp_getprop(void *arg, const char *pname, void *buf, uint32_t *sizep) +{ + svp_t *svp = arg; + + /* svp/host */ + if (strcmp(pname, varpd_svp_props[0]) == 0) { + size_t len; + + mutex_enter(&svp->svp_lock); + if (svp->svp_host == NULL) { + *sizep = 0; + } else { + len = strlen(svp->svp_host) + 1; + if (*sizep < len) { + mutex_exit(&svp->svp_lock); + return (EOVERFLOW); + } + *sizep = len; + (void) strlcpy(buf, svp->svp_host, *sizep); + } + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/port */ + if (strcmp(pname, varpd_svp_props[1]) == 0) { + uint64_t val; + + if (*sizep < sizeof (uint64_t)) + return (EOVERFLOW); + + mutex_enter(&svp->svp_lock); + if (svp->svp_port == 0) { + *sizep = 0; + } else { + val = svp->svp_port; + bcopy(&val, buf, sizeof (uint64_t)); + *sizep = sizeof (uint64_t); + } + + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/underlay_ip */ + if (strcmp(pname, varpd_svp_props[2]) == 0) { + if (*sizep > sizeof (struct in6_addr)) + return (EOVERFLOW); + mutex_enter(&svp->svp_lock); + if (svp->svp_huip == B_FALSE) { + *sizep = 0; + } else { + bcopy(&svp->svp_uip, buf, sizeof (struct in6_addr)); + *sizep = sizeof (struct in6_addr); + } + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/underlay_port */ + if (strcmp(pname, varpd_svp_props[3]) == 0) { + uint64_t val; + + if (*sizep < sizeof (uint64_t)) + return (EOVERFLOW); + + mutex_enter(&svp->svp_lock); + if (svp->svp_uport == 0) { + *sizep = 0; + } else { + val = svp->svp_uport; + bcopy(&val, buf, sizeof (uint64_t)); + *sizep = sizeof (uint64_t); + } + + mutex_exit(&svp->svp_lock); + return (0); + } + + return (EINVAL); +} + +static int +varpd_svp_setprop(void *arg, const char *pname, const void *buf, + const uint32_t size) +{ + svp_t *svp = arg; + + /* svp/host */ + if (strcmp(pname, varpd_svp_props[0]) == 0) { + char *dup; + dup = umem_alloc(size, UMEM_DEFAULT); + (void) strlcpy(dup, buf, size); + if (dup == NULL) + return (ENOMEM); + mutex_enter(&svp->svp_lock); + if (svp->svp_host != NULL) + umem_free(svp->svp_host, strlen(svp->svp_host) + 1); + svp->svp_host = dup; + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/port */ + if (strcmp(pname, varpd_svp_props[1]) == 0) { + const uint64_t *valp = buf; + if (size < sizeof (uint64_t)) + return (EOVERFLOW); + + if (*valp == 0 || *valp > UINT16_MAX) + return (EINVAL); + + mutex_enter(&svp->svp_lock); + svp->svp_port = (uint16_t)*valp; + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/underlay_ip */ + if (strcmp(pname, varpd_svp_props[2]) == 0) { + const struct in6_addr *ipv6 = buf; + + if (size < sizeof (struct in6_addr)) + return (EOVERFLOW); + + if (IN6_IS_ADDR_V4COMPAT(ipv6)) + return (EINVAL); + + if (IN6_IS_ADDR_MULTICAST(ipv6)) + return (EINVAL); + + if (IN6_IS_ADDR_6TO4(ipv6)) + return (EINVAL); + + if (IN6_IS_ADDR_V4MAPPED(ipv6)) { + ipaddr_t v4; + IN6_V4MAPPED_TO_IPADDR(ipv6, v4); + if (IN_MULTICAST(ntohl(v4))) + return (EINVAL); + } + + mutex_enter(&svp->svp_lock); + bcopy(buf, &svp->svp_uip, sizeof (struct in6_addr)); + svp->svp_huip = B_TRUE; + mutex_exit(&svp->svp_lock); + return (0); + } + + /* svp/underlay_port */ + if (strcmp(pname, varpd_svp_props[3]) == 0) { + const uint64_t *valp = buf; + if (size < sizeof (uint64_t)) + return (EOVERFLOW); + + if (*valp == 0 || *valp > UINT16_MAX) + return (EINVAL); + + mutex_enter(&svp->svp_lock); + svp->svp_uport = (uint16_t)*valp; + mutex_exit(&svp->svp_lock); + + return (0); + } + + return (EINVAL); +} + +static int +varpd_svp_save(void *arg, nvlist_t *nvp) +{ + int ret; + svp_t *svp = arg; + + mutex_enter(&svp->svp_lock); + if (svp->svp_host != NULL) { + if ((ret = nvlist_add_string(nvp, varpd_svp_props[0], + svp->svp_host)) != 0) { + mutex_exit(&svp->svp_lock); + return (ret); + } + } + + if (svp->svp_port != 0) { + if ((ret = nvlist_add_uint16(nvp, varpd_svp_props[1], + svp->svp_port)) != 0) { + mutex_exit(&svp->svp_lock); + return (ret); + } + } + + if (svp->svp_huip == B_TRUE) { + char buf[INET6_ADDRSTRLEN]; + + if (inet_ntop(AF_INET6, &svp->svp_uip, buf, sizeof (buf)) == + NULL) + libvarpd_panic("unexpected inet_ntop failure: %d", + errno); + + if ((ret = nvlist_add_string(nvp, varpd_svp_props[2], + buf)) != 0) { + mutex_exit(&svp->svp_lock); + return (ret); + } + } + + if (svp->svp_uport != 0) { + if ((ret = nvlist_add_uint16(nvp, varpd_svp_props[3], + svp->svp_uport)) != 0) { + mutex_exit(&svp->svp_lock); + return (ret); + } + } + + mutex_exit(&svp->svp_lock); + return (0); +} + +static int +varpd_svp_restore(nvlist_t *nvp, varpd_provider_handle_t *hdl, + overlay_plugin_dest_t dest, void **outp) +{ + int ret; + svp_t *svp; + char *ipstr, *hstr; + + if (varpd_svp_valid_dest(dest) == B_FALSE) + return (ENOTSUP); + + if ((ret = varpd_svp_create(hdl, (void **)&svp, dest)) != 0) + return (ret); + + if ((ret = nvlist_lookup_string(nvp, varpd_svp_props[0], + &hstr)) != 0) { + if (ret != ENOENT) { + varpd_svp_destroy(svp); + return (ret); + } + svp->svp_host = NULL; + } else { + size_t blen = strlen(hstr) + 1; + svp->svp_host = umem_alloc(blen, UMEM_DEFAULT); + (void) strlcpy(svp->svp_host, hstr, blen); + } + + if ((ret = nvlist_lookup_uint16(nvp, varpd_svp_props[1], + &svp->svp_port)) != 0) { + if (ret != ENOENT) { + varpd_svp_destroy(svp); + return (ret); + } + svp->svp_port = 0; + } + + if ((ret = nvlist_lookup_string(nvp, varpd_svp_props[2], + &ipstr)) != 0) { + if (ret != ENOENT) { + varpd_svp_destroy(svp); + return (ret); + } + svp->svp_huip = B_FALSE; + } else { + ret = inet_pton(AF_INET6, ipstr, &svp->svp_uip); + if (ret == -1) { + assert(errno == EAFNOSUPPORT); + libvarpd_panic("unexpected inet_pton failure: %d", + errno); + } + + if (ret == 0) { + varpd_svp_destroy(svp); + return (EINVAL); + } + svp->svp_huip = B_TRUE; + } + + if ((ret = nvlist_lookup_uint16(nvp, varpd_svp_props[3], + &svp->svp_uport)) != 0) { + if (ret != ENOENT) { + varpd_svp_destroy(svp); + return (ret); + } + svp->svp_uport = 0; + } + + svp->svp_hdl = hdl; + *outp = svp; + return (0); +} + +static void +varpd_svp_arp(void *arg, varpd_arp_handle_t *vah, int type, + const struct sockaddr *sock, uint8_t *out) +{ + svp_t *svp = arg; + svp_lookup_t *svl; + + if (type != VARPD_QTYPE_ETHERNET) { + libvarpd_plugin_arp_reply(vah, VARPD_LOOKUP_DROP); + return; + } + + svl = umem_cache_alloc(svp_lookup_cache, UMEM_DEFAULT); + if (svl == NULL) { + libvarpd_plugin_arp_reply(vah, VARPD_LOOKUP_DROP); + return; + } + + svl->svl_type = SVP_L_VL3; + svl->svl_u.svl_vl3.svl_vah = vah; + svl->svl_u.svl_vl3.svl_out = out; + svp_remote_vl3_lookup(svp, &svl->svl_query, sock, svl); +} + +static const varpd_plugin_ops_t varpd_svp_ops = { + 0, + varpd_svp_create, + varpd_svp_start, + varpd_svp_stop, + varpd_svp_destroy, + NULL, + varpd_svp_lookup, + varpd_svp_nprops, + varpd_svp_propinfo, + varpd_svp_getprop, + varpd_svp_setprop, + varpd_svp_save, + varpd_svp_restore, + varpd_svp_arp, + NULL +}; + +static int +svp_bunyan_init(void) +{ + int ret; + + if ((ret = bunyan_init("svp", &svp_bunyan)) != 0) + return (ret); + ret = bunyan_stream_add(svp_bunyan, "stderr", BUNYAN_L_INFO, + bunyan_stream_fd, (void *)STDERR_FILENO); + if (ret != 0) + bunyan_fini(svp_bunyan); + return (ret); +} + +static void +svp_bunyan_fini(void) +{ + if (svp_bunyan != NULL) + bunyan_fini(svp_bunyan); +} + +#pragma init(varpd_svp_init) +static void +varpd_svp_init(void) +{ + int err; + varpd_plugin_register_t *vpr; + + if (svp_bunyan_init() != 0) + return; + + if ((err = svp_host_init()) != 0) { + (void) bunyan_error(svp_bunyan, "failed to init host subsystem", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_bunyan_fini(); + return; + } + + svp_lookup_cache = umem_cache_create("svp_lookup", + sizeof (svp_lookup_t), 0, NULL, NULL, NULL, NULL, NULL, 0); + if (svp_lookup_cache == NULL) { + (void) bunyan_error(svp_bunyan, + "failed to create svp_lookup cache", + BUNYAN_T_INT32, "error", errno, + BUNYAN_T_END); + svp_bunyan_fini(); + return; + } + + if ((err = svp_event_init()) != 0) { + (void) bunyan_error(svp_bunyan, + "failed to init event subsystem", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_bunyan_fini(); + umem_cache_destroy(svp_lookup_cache); + return; + } + + if ((err = svp_timer_init()) != 0) { + (void) bunyan_error(svp_bunyan, + "failed to init timer subsystem", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_event_fini(); + umem_cache_destroy(svp_lookup_cache); + svp_bunyan_fini(); + return; + } + + if ((err = svp_remote_init()) != 0) { + (void) bunyan_error(svp_bunyan, + "failed to init remote subsystem", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_event_fini(); + umem_cache_destroy(svp_lookup_cache); + svp_bunyan_fini(); + return; + } + + vpr = libvarpd_plugin_alloc(VARPD_CURRENT_VERSION, &err); + if (vpr == NULL) { + (void) bunyan_error(svp_bunyan, + "failed to alloc varpd plugin", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_remote_fini(); + svp_event_fini(); + umem_cache_destroy(svp_lookup_cache); + svp_bunyan_fini(); + return; + } + + vpr->vpr_mode = OVERLAY_TARGET_DYNAMIC; + vpr->vpr_name = "svp"; + vpr->vpr_ops = &varpd_svp_ops; + + if ((err = libvarpd_plugin_register(vpr)) != 0) { + (void) bunyan_error(svp_bunyan, + "failed to register varpd plugin", + BUNYAN_T_INT32, "error", err, + BUNYAN_T_END); + svp_remote_fini(); + svp_event_fini(); + umem_cache_destroy(svp_lookup_cache); + svp_bunyan_fini(); + + } + libvarpd_plugin_free(vpr); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp.h b/usr/src/lib/varpd/svp/common/libvarpd_svp.h new file mode 100644 index 0000000000..8192b842ce --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp.h @@ -0,0 +1,357 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +#ifndef _LIBVARPD_SVP_H +#define _LIBVARPD_SVP_H + +/* + * Implementation details of the SVP plugin and the SVP protocol. + */ + +#include <netinet/in.h> +#include <sys/ethernet.h> +#include <thread.h> +#include <synch.h> +#include <libvarpd_provider.h> +#include <sys/avl.h> +#include <port.h> +#include <sys/list.h> +#include <bunyan.h> + +#include <libvarpd_svp_prot.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct svp svp_t; +typedef struct svp_remote svp_remote_t; +typedef struct svp_conn svp_conn_t; +typedef struct svp_query svp_query_t; + +typedef void (*svp_event_f)(port_event_t *, void *); + +typedef struct svp_event { + svp_event_f se_func; + void *se_arg; + int se_events; +} svp_event_t; + +typedef void (*svp_timer_f)(void *); + +typedef struct svp_timer { + svp_timer_f st_func; /* Timer callback function */ + void *st_arg; /* Timer callback arg */ + boolean_t st_oneshot; /* Is timer a one shot? */ + uint32_t st_value; /* periodic or one-shot time */ + /* Fields below here are private to the svp_timer implementaiton */ + uint64_t st_expire; /* Next expiration */ + boolean_t st_delivering; /* Are we currently delivering this */ + avl_node_t st_link; +} svp_timer_t; + +/* + * Note, both the svp_log_ack_t and svp_lrm_req_t are not part of this structure + * as they are rather variable sized data and we don't want to constrain their + * size. Instead, the rdata and wdata members must be set appropriately. + */ +typedef union svp_query_data { + svp_vl2_req_t sqd_vl2r; + svp_vl2_ack_t sqd_vl2a; + svp_vl3_req_t sdq_vl3r; + svp_vl3_ack_t sdq_vl3a; + svp_log_req_t sdq_logr; + svp_lrm_ack_t sdq_lrma; +} svp_query_data_t; + +typedef void (*svp_query_f)(svp_query_t *, void *); + +typedef enum svp_query_state { + SVP_QUERY_INIT = 0x00, + SVP_QUERY_WRITING = 0x01, + SVP_QUERY_READING = 0x02, + SVP_QUERY_FINISHED = 0x03 +} svp_query_state_t; + +/* + * The query structure is usable for all forms of svp queries that end up + * getting passed across. Right now it's optimized for the fixed size data + * requests as opposed to requests whose responses will always be streaming in + * nature. Though, the streaming requests are the less common ones we have. We + * may need to make additional changes for those. + */ +struct svp_query { + list_node_t sq_lnode; /* List entry */ + svp_query_f sq_func; /* Callback function */ + svp_query_state_t sq_state; /* Query state */ + void *sq_arg; /* Callback function arg */ + svp_t *sq_svp; /* Pointer back to svp_t */ + svp_req_t sq_header; /* Header for the query */ + svp_query_data_t sq_rdun; /* Union for read data */ + svp_query_data_t sq_wdun; /* Union for write data */ + svp_status_t sq_status; /* Query response status */ + size_t sq_size; /* Query response size */ + void *sq_rdata; /* Read data pointer */ + size_t sq_rsize; /* Read data size */ + void *sq_wdata; /* Write data pointer */ + size_t sq_wsize; /* Write data size */ + hrtime_t sq_acttime; /* Last I/O activity time */ +}; + +typedef enum svp_conn_state { + SVP_CS_ERROR = 0x00, + SVP_CS_INITIAL = 0x01, + SVP_CS_CONNECTING = 0x02, + SVP_CS_BACKOFF = 0x03, + SVP_CS_ACTIVE = 0x04, + SVP_CS_WINDDOWN = 0x05 +} svp_conn_state_t; + +typedef enum svp_conn_error { + SVP_CE_NONE = 0x00, + SVP_CE_ASSOCIATE = 0x01, + SVP_CE_NOPOLLOUT = 0x02, + SVP_CE_SOCKET = 0x03 +} svp_conn_error_t; + +typedef enum svp_conn_flags { + SVP_CF_ADDED = 0x01, + SVP_CF_DEGRADED = 0x02, + SVP_CF_REAP = 0x04, + SVP_CF_TEARDOWN = 0x08, + SVP_CF_UFLAG = 0x0c, + SVP_CF_USER = 0x10 +} svp_conn_flags_t; + +typedef struct svp_conn_out { + svp_query_t *sco_query; + size_t sco_offset; +} svp_conn_out_t; + +typedef struct svp_conn_in { + svp_query_t *sci_query; + svp_req_t sci_req; + size_t sci_offset; +} svp_conn_in_t; + +struct svp_conn { + svp_remote_t *sc_remote; /* RO */ + struct in6_addr sc_addr; /* RO */ + list_node_t sc_rlist; /* svp_remote_t`sr_lock */ + mutex_t sc_lock; + svp_event_t sc_event; + svp_timer_t sc_btimer; + svp_timer_t sc_qtimer; + int sc_socket; + uint_t sc_gen; + uint_t sc_nbackoff; + svp_conn_flags_t sc_flags; + svp_conn_state_t sc_cstate; + svp_conn_error_t sc_error; + int sc_errno; + list_t sc_queries; + svp_conn_out_t sc_output; + svp_conn_in_t sc_input; +}; + +typedef enum svp_remote_state { + SVP_RS_LOOKUP_SCHEDULED = 0x01, /* On the DNS Queue */ + SVP_RS_LOOKUP_INPROGRESS = 0x02, /* Doing a DNS lookup */ + SVP_RS_LOOKUP_VALID = 0x04 /* addrinfo valid */ +} svp_remote_state_t; + +/* + * These series of bit-based flags should be ordered such that the most severe + * is first. We only can set one message that user land can see, so if more than + * one is set we want to make sure that one is there. + */ +typedef enum svp_degrade_state { + SVP_RD_DNS_FAIL = 0x01, /* DNS Resolution Failure */ + SVP_RD_REMOTE_FAIL = 0x02, /* cannot reach any remote peers */ + SVP_RD_ALL = 0x03 /* Only suitable for restore */ +} svp_degrade_state_t; + +typedef enum svp_shootdown_flags { + SVP_SD_RUNNING = 0x01, + SVP_SD_QUIESCE = 0x02, + SVP_SD_DORM = 0x04 +} svp_shootdown_flags_t; + +/* + * There is a single svp_sdlog_t per svp_remote_t. It maintains its own lock and + * condition variables. See the big theory statement for more information on how + * it's used. + */ +typedef struct svp_sdlog { + mutex_t sdl_lock; + cond_t sdl_cond; + uint_t sdl_ref; + svp_timer_t sdl_timer; + svp_shootdown_flags_t sdl_flags; + svp_query_t sdl_query; + void *sdl_logack; + void *sdl_logrm; + void *sdl_remote; +} svp_sdlog_t; + +struct svp_remote { + char *sr_hostname; /* RO */ + uint16_t sr_rport; /* RO */ + struct in6_addr sr_uip; /* RO */ + avl_node_t sr_gnode; /* svp_remote_lock */ + svp_remote_t *sr_nexthost; /* svp_host_lock */ + mutex_t sr_lock; + cond_t sr_cond; + svp_remote_state_t sr_state; + svp_degrade_state_t sr_degrade; + struct addrinfo *sr_addrinfo; + avl_tree_t sr_tree; + uint_t sr_count; /* active count */ + uint_t sr_gen; + uint_t sr_tconns; /* total conns + dconns */ + uint_t sr_ndconns; /* number of degraded conns */ + list_t sr_conns; /* all conns */ + svp_sdlog_t sr_shoot; +}; + +/* + * We have a bunch of different things that we get back from the API at the + * plug-in layer. These include: + * + * o OOB Shootdowns + * o VL3->VL2 Lookups + * o VL2->UL3 Lookups + * o VL2 Log invalidations + * o VL3 Log injections + */ +typedef void (*svp_vl2_lookup_f)(svp_t *, svp_status_t, const struct in6_addr *, + const uint16_t, void *); +typedef void (*svp_vl3_lookup_f)(svp_t *, svp_status_t, const uint8_t *, + const struct in6_addr *, const uint16_t, void *); +typedef void (*svp_vl2_invalidation_f)(svp_t *, const uint8_t *); +typedef void (*svp_vl3_inject_f)(svp_t *, const uint16_t, + const struct in6_addr *, const uint8_t *, const uint8_t *); +typedef void (*svp_shootdown_f)(svp_t *, const uint8_t *, + const struct in6_addr *, const uint16_t uport); + +typedef struct svp_cb { + svp_vl2_lookup_f scb_vl2_lookup; + svp_vl3_lookup_f scb_vl3_lookup; + svp_vl2_invalidation_f scb_vl2_invalidate; + svp_vl3_inject_f scb_vl3_inject; + svp_shootdown_f scb_shootdown; +} svp_cb_t; + +/* + * Core implementation structure. + */ +struct svp { + overlay_plugin_dest_t svp_dest; /* RO */ + varpd_provider_handle_t *svp_hdl; /* RO */ + svp_cb_t svp_cb; /* RO */ + uint64_t svp_vid; /* RO */ + avl_node_t svp_rlink; /* Owned by svp_remote */ + svp_remote_t *svp_remote; /* RO iff started */ + mutex_t svp_lock; + char *svp_host; /* svp_lock */ + uint16_t svp_port; /* svp_lock */ + uint16_t svp_uport; /* svp_lock */ + boolean_t svp_huip; /* svp_lock */ + struct in6_addr svp_uip; /* svp_lock */ +}; + +extern bunyan_logger_t *svp_bunyan; + +extern int svp_remote_find(char *, uint16_t, struct in6_addr *, + svp_remote_t **); +extern int svp_remote_attach(svp_remote_t *, svp_t *); +extern void svp_remote_detach(svp_t *); +extern void svp_remote_release(svp_remote_t *); +extern void svp_remote_vl3_lookup(svp_t *, svp_query_t *, + const struct sockaddr *, void *); +extern void svp_remote_vl2_lookup(svp_t *, svp_query_t *, const uint8_t *, + void *); + +/* + * Init functions + */ +extern int svp_remote_init(void); +extern void svp_remote_fini(void); +extern int svp_event_init(void); +extern int svp_event_timer_init(svp_event_t *); +extern void svp_event_fini(void); +extern int svp_host_init(void); +extern int svp_timer_init(void); + +/* + * Timers + */ +extern int svp_tickrate; +extern void svp_timer_add(svp_timer_t *); +extern void svp_timer_remove(svp_timer_t *); + +/* + * Event loop management + */ +extern int svp_event_associate(svp_event_t *, int); +extern int svp_event_dissociate(svp_event_t *, int); +extern int svp_event_inject(svp_event_t *); + +/* + * Connection manager + */ +extern int svp_conn_create(svp_remote_t *, const struct in6_addr *); +extern void svp_conn_destroy(svp_conn_t *); +extern void svp_conn_fallout(svp_conn_t *); +extern void svp_conn_queue(svp_conn_t *, svp_query_t *); + +/* + * FMA related + */ +extern void svp_remote_degrade(svp_remote_t *, svp_degrade_state_t); +extern void svp_remote_restore(svp_remote_t *, svp_degrade_state_t); + +/* + * Misc. + */ +extern int svp_comparator(const void *, const void *); +extern void svp_remote_reassign(svp_remote_t *, svp_conn_t *); +extern void svp_remote_resolved(svp_remote_t *, struct addrinfo *); +extern void svp_host_queue(svp_remote_t *); +extern void svp_query_release(svp_query_t *); +extern void svp_query_crc32(svp_req_t *, void *, size_t); + +/* + * Shootdown related + */ +extern void svp_remote_shootdown_vl3(svp_remote_t *, svp_log_vl3_t *, + svp_sdlog_t *); +extern void svp_remote_shootdown_vl2(svp_remote_t *, svp_log_vl2_t *); +extern void svp_remote_log_request(svp_remote_t *, svp_query_t *, void *, + size_t); +extern void svp_remote_lrm_request(svp_remote_t *, svp_query_t *, void *, + size_t); +extern void svp_shootdown_logr_cb(svp_remote_t *, svp_status_t, void *, size_t); +extern void svp_shootdown_lrm_cb(svp_remote_t *, svp_status_t); +extern void svp_shootdown_vl3_cb(svp_status_t, svp_log_vl3_t *, svp_sdlog_t *); +extern int svp_shootdown_init(svp_remote_t *); +extern void svp_shootdown_fini(svp_remote_t *); +extern void svp_shootdown_start(svp_remote_t *); + +#ifdef __cplusplus +} +#endif + +#endif /* _LIBVARPD_SVP_H */ diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c new file mode 100644 index 0000000000..4d10d1dba4 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c @@ -0,0 +1,1030 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * Logic to manage an individual connection to a remote host. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +#include <assert.h> +#include <umem.h> +#include <errno.h> +#include <strings.h> +#include <unistd.h> +#include <stddef.h> +#include <sys/uio.h> +#include <sys/debug.h> + +#include <libvarpd_svp.h> + +int svp_conn_query_timeout = 30; +static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 }; +static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int); + +typedef enum svp_conn_act { + SVP_RA_NONE = 0x00, + SVP_RA_DEGRADE = 0x01, + SVP_RA_RESTORE = 0x02, + SVP_RA_ERROR = 0x03, + SVP_RA_CLEANUP = 0x04 +} svp_conn_act_t; + +static void +svp_conn_inject(svp_conn_t *scp) +{ + int ret; + assert(MUTEX_HELD(&scp->sc_lock)); + + if (scp->sc_flags & SVP_CF_USER) + return; + scp->sc_flags |= SVP_CF_USER; + if ((ret = svp_event_inject(&scp->sc_event)) != 0) + libvarpd_panic("failed to inject event: %d\n", ret); +} + +static void +svp_conn_degrade(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + + if (scp->sc_flags & SVP_CF_DEGRADED) + return; + + scp->sc_flags |= SVP_CF_DEGRADED; + srp->sr_ndconns++; + if (srp->sr_ndconns == srp->sr_tconns) + svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL); +} + +static void +svp_conn_restore(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + + if (!(scp->sc_flags & SVP_CF_DEGRADED)) + return; + + scp->sc_flags &= ~SVP_CF_DEGRADED; + if (srp->sr_ndconns == srp->sr_tconns) + svp_remote_restore(srp, SVP_RD_REMOTE_FAIL); + srp->sr_ndconns--; +} + +static void +svp_conn_add(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + + if (scp->sc_flags & SVP_CF_ADDED) + return; + + list_insert_tail(&srp->sr_conns, scp); + scp->sc_flags |= SVP_CF_ADDED; + srp->sr_tconns++; +} + +static void +svp_conn_remove(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + + if (!(scp->sc_flags & SVP_CF_ADDED)) + return; + + scp->sc_flags &= ~SVP_CF_ADDED; + if (scp->sc_flags & SVP_CF_DEGRADED) + srp->sr_ndconns--; + srp->sr_tconns--; + if (srp->sr_tconns == srp->sr_ndconns) + svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL); +} + +static svp_query_t * +svp_conn_query_find(svp_conn_t *scp, uint32_t id) +{ + svp_query_t *sqp; + + assert(MUTEX_HELD(&scp->sc_lock)); + + for (sqp = list_head(&scp->sc_queries); sqp != NULL; + sqp = list_next(&scp->sc_queries, sqp)) { + if (sqp->sq_header.svp_id == id) + break; + } + + return (sqp); +} + +static svp_conn_act_t +svp_conn_backoff(svp_conn_t *scp) +{ + assert(MUTEX_HELD(&scp->sc_lock)); + + if (close(scp->sc_socket) != 0) + libvarpd_panic("failed to close socket %d: %d\n", + scp->sc_socket, errno); + scp->sc_socket = -1; + + scp->sc_cstate = SVP_CS_BACKOFF; + scp->sc_nbackoff++; + if (scp->sc_nbackoff >= svp_conn_nbackoff) { + scp->sc_btimer.st_value = + svp_conn_backoff_tbl[svp_conn_nbackoff - 1]; + } else { + scp->sc_btimer.st_value = + svp_conn_backoff_tbl[scp->sc_nbackoff - 1]; + } + svp_timer_add(&scp->sc_btimer); + + if (scp->sc_nbackoff > svp_conn_nbackoff) + return (SVP_RA_DEGRADE); + return (SVP_RA_NONE); +} + +static svp_conn_act_t +svp_conn_connect(svp_conn_t *scp) +{ + int ret; + struct sockaddr_in6 in6; + + assert(MUTEX_HELD(&scp->sc_lock)); + assert(scp->sc_cstate == SVP_CS_BACKOFF || + scp->sc_cstate == SVP_CS_INITIAL); + assert(scp->sc_socket == -1); + if (scp->sc_cstate == SVP_CS_INITIAL) + scp->sc_nbackoff = 0; + + scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (scp->sc_socket == -1) { + scp->sc_error = SVP_CE_SOCKET; + scp->sc_errno = errno; + scp->sc_cstate = SVP_CS_ERROR; + return (SVP_RA_DEGRADE); + } + + bzero(&in6, sizeof (struct sockaddr_in6)); + in6.sin6_family = AF_INET6; + in6.sin6_port = htons(scp->sc_remote->sr_rport); + bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr)); + ret = connect(scp->sc_socket, (struct sockaddr *)&in6, + sizeof (struct sockaddr_in6)); + if (ret != 0) { + boolean_t async = B_FALSE; + + switch (errno) { + case EACCES: + case EADDRINUSE: + case EAFNOSUPPORT: + case EALREADY: + case EBADF: + case EISCONN: + case ELOOP: + case ENOENT: + case ENOSR: + case EWOULDBLOCK: + libvarpd_panic("unanticipated connect errno %d", errno); + break; + case EINPROGRESS: + case EINTR: + async = B_TRUE; + default: + break; + } + + /* + * So, we will be connecting to this in the future, advance our + * state and make sure that we poll for the next round. + */ + if (async == B_TRUE) { + scp->sc_cstate = SVP_CS_CONNECTING; + scp->sc_event.se_events = POLLOUT | POLLHUP; + ret = svp_event_associate(&scp->sc_event, + scp->sc_socket); + if (ret == 0) + return (SVP_RA_NONE); + scp->sc_error = SVP_CE_ASSOCIATE; + scp->sc_errno = ret; + scp->sc_cstate = SVP_CS_ERROR; + return (SVP_RA_DEGRADE); + } else { + /* + * This call failed, which means that we obtained one of + * the following: + * + * EADDRNOTAVAIL + * ECONNREFUSED + * EIO + * ENETUNREACH + * EHOSTUNREACH + * ENXIO + * ETIMEDOUT + * + * Therefore we need to set ourselves into backoff and + * wait for that to clear up. + */ + return (svp_conn_backoff(scp)); + } + } + + /* + * We've connected. Successfully move ourselves to the bound + * state and start polling. + */ + scp->sc_cstate = SVP_CS_ACTIVE; + scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; + ret = svp_event_associate(&scp->sc_event, scp->sc_socket); + if (ret == 0) + return (SVP_RA_RESTORE); + scp->sc_error = SVP_CE_ASSOCIATE; + scp->sc_cstate = SVP_CS_ERROR; + + return (SVP_RA_DEGRADE); +} + +/* + * This should be the first call we get after a connect. If we have successfully + * connected, we should see a writeable event. We may also see an error or a + * hang up. In either of these cases, we transition to error mode. If there is + * also a readable event, we ignore it at the moment and just let a + * reassociation pick it up so we can simplify the set of state transitions that + * we have. + */ +static svp_conn_act_t +svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp) +{ + int ret, err; + socklen_t sl = sizeof (err); + if (!(pe->portev_events & POLLOUT)) { + scp->sc_errno = 0; + scp->sc_error = SVP_CE_NOPOLLOUT; + scp->sc_cstate = SVP_CS_ERROR; + return (SVP_RA_DEGRADE); + } + + ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl); + if (ret != 0) + libvarpd_panic("unanticipated getsockopt error"); + if (err != 0) { + return (svp_conn_backoff(scp)); + } + + scp->sc_cstate = SVP_CS_ACTIVE; + scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; + ret = svp_event_associate(&scp->sc_event, scp->sc_socket); + if (ret == 0) + return (SVP_RA_RESTORE); + scp->sc_error = SVP_CE_ASSOCIATE; + scp->sc_errno = ret; + scp->sc_cstate = SVP_CS_ERROR; + return (SVP_RA_DEGRADE); +} + +static svp_conn_act_t +svp_conn_pollout(svp_conn_t *scp) +{ + svp_query_t *sqp; + svp_req_t *req; + size_t off; + struct iovec iov[2]; + int nvecs = 0; + ssize_t ret; + + assert(MUTEX_HELD(&scp->sc_lock)); + + /* + * We need to find a query and start writing it out. + */ + if (scp->sc_output.sco_query == NULL) { + for (sqp = list_head(&scp->sc_queries); sqp != NULL; + sqp = list_next(&scp->sc_queries, sqp)) { + if (sqp->sq_state != SVP_QUERY_INIT) + continue; + break; + } + + if (sqp == NULL) { + scp->sc_event.se_events &= ~POLLOUT; + return (SVP_RA_NONE); + } + + scp->sc_output.sco_query = sqp; + scp->sc_output.sco_offset = 0; + sqp->sq_state = SVP_QUERY_WRITING; + svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize); + } + + sqp = scp->sc_output.sco_query; + req = &sqp->sq_header; + off = scp->sc_output.sco_offset; + if (off < sizeof (svp_req_t)) { + iov[nvecs].iov_base = (void *)((uintptr_t)req + off); + iov[nvecs].iov_len = sizeof (svp_req_t) - off; + nvecs++; + off = 0; + } else { + off -= sizeof (svp_req_t); + } + + iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off); + iov[nvecs].iov_len = sqp->sq_rsize - off; + nvecs++; + + do { + ret = writev(scp->sc_socket, iov, nvecs); + } while (ret == -1 && errno == EAGAIN); + if (ret == -1) { + switch (errno) { + case EAGAIN: + scp->sc_event.se_events |= POLLOUT; + return (SVP_RA_NONE); + case EIO: + case ENXIO: + case ECONNRESET: + return (SVP_RA_ERROR); + default: + libvarpd_panic("unexpected errno: %d", errno); + } + } + + sqp->sq_acttime = gethrtime(); + scp->sc_output.sco_offset += ret; + if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) { + sqp->sq_state = SVP_QUERY_READING; + scp->sc_output.sco_query = NULL; + scp->sc_output.sco_offset = 0; + scp->sc_event.se_events |= POLLOUT; + } + return (SVP_RA_NONE); +} + +static boolean_t +svp_conn_pollin_validate(svp_conn_t *scp) +{ + svp_query_t *sqp; + uint32_t nsize; + uint16_t nvers, nop; + svp_req_t *resp = &scp->sc_input.sci_req; + + assert(MUTEX_HELD(&scp->sc_lock)); + + nvers = ntohs(resp->svp_ver); + nop = ntohs(resp->svp_op); + nsize = ntohl(resp->svp_size); + + if (nvers != SVP_CURRENT_VERSION) { + (void) bunyan_warn(svp_bunyan, "unsupported version", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_END); + return (B_FALSE); + } + + if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK && + nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) { + (void) bunyan_warn(svp_bunyan, "unsupported operation", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_END); + return (B_FALSE); + } + + sqp = svp_conn_query_find(scp, resp->svp_id); + if (sqp == NULL) { + (void) bunyan_warn(svp_bunyan, "unknown response id", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_END); + return (B_FALSE); + } + + if (sqp->sq_state != SVP_QUERY_READING) { + (void) bunyan_warn(svp_bunyan, + "got response for unexpecting query", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_INT32, "query_state", sqp->sq_state, + BUNYAN_T_END); + return (B_FALSE); + } + + if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) || + (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) || + (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) { + (void) bunyan_warn(svp_bunyan, "response size too large", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_INT32, "response_size", nsize, + BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ? + sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t), + BUNYAN_T_INT32, "query_state", sqp->sq_state, + BUNYAN_T_END); + return (B_FALSE); + } + + /* + * The valid size is anything <= to what the user requested, but at + * least svp_log_ack_t bytes large. + */ + if (nop == SVP_R_LOG_ACK) { + const char *msg = NULL; + if (nsize < sizeof (svp_log_ack_t)) + msg = "response size too small"; + else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count) + msg = "response size too large"; + if (msg != NULL) { + (void) bunyan_warn(svp_bunyan, msg, + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", + scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", nvers, + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response_id", resp->svp_id, + BUNYAN_T_INT32, "response_size", nsize, + BUNYAN_T_INT32, "expected_size", + ((svp_log_req_t *)sqp->sq_rdata)->svlr_count, + BUNYAN_T_INT32, "query_state", sqp->sq_state, + BUNYAN_T_END); + return (B_FALSE); + } + } + + sqp->sq_size = nsize; + scp->sc_input.sci_query = sqp; + if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK || + nop == SVP_R_LOG_RM_ACK) { + sqp->sq_wdata = &sqp->sq_wdun; + sqp->sq_wsize = sizeof (svp_query_data_t); + } else { + VERIFY(nop == SVP_R_LOG_ACK); + assert(sqp->sq_wdata != NULL); + assert(sqp->sq_wsize != 0); + } + + return (B_TRUE); +} + +static svp_conn_act_t +svp_conn_pollin(svp_conn_t *scp) +{ + size_t off, total; + ssize_t ret; + svp_query_t *sqp; + uint32_t crc; + uint16_t nop; + + assert(MUTEX_HELD(&scp->sc_lock)); + + /* + * No query implies that we're reading in the header and that the offset + * is associted with it. + */ + off = scp->sc_input.sci_offset; + sqp = scp->sc_input.sci_query; + if (scp->sc_input.sci_query == NULL) { + svp_req_t *resp = &scp->sc_input.sci_req; + + assert(off < sizeof (svp_req_t)); + + do { + ret = read(scp->sc_socket, + (void *)((uintptr_t)resp + off), + sizeof (svp_req_t) - off); + } while (ret == -1 && errno == EINTR); + if (ret == -1) { + switch (errno) { + case EAGAIN: + scp->sc_event.se_events |= POLLIN | POLLRDNORM; + return (SVP_RA_NONE); + case EIO: + case ECONNRESET: + return (SVP_RA_ERROR); + break; + default: + libvarpd_panic("unexpeted read errno: %d", + errno); + } + } else if (ret == 0) { + /* Try to reconnect to the remote host */ + return (SVP_RA_ERROR); + } + + /* Didn't get all the data we need */ + if (off + ret < sizeof (svp_req_t)) { + scp->sc_input.sci_offset += ret; + scp->sc_event.se_events |= POLLIN | POLLRDNORM; + return (SVP_RA_NONE); + } + + if (svp_conn_pollin_validate(scp) != B_TRUE) + return (SVP_RA_ERROR); + } + + sqp = scp->sc_input.sci_query; + assert(sqp != NULL); + sqp->sq_acttime = gethrtime(); + total = ntohl(scp->sc_input.sci_req.svp_size); + do { + ret = read(scp->sc_socket, + (void *)((uintptr_t)sqp->sq_wdata + off), + total - off); + } while (ret == -1 && errno == EINTR); + + if (ret == -1) { + switch (errno) { + case EAGAIN: + scp->sc_event.se_events |= POLLIN | POLLRDNORM; + return (SVP_RA_NONE); + case EIO: + case ECONNRESET: + return (SVP_RA_ERROR); + break; + default: + libvarpd_panic("unexpeted read errno: %d", errno); + } + } else if (ret == 0) { + /* Try to reconnect to the remote host */ + return (SVP_RA_ERROR); + } + + if (ret + off < total) { + scp->sc_input.sci_offset += ret; + return (SVP_RA_NONE); + } + + nop = ntohs(scp->sc_input.sci_req.svp_op); + crc = scp->sc_input.sci_req.svp_crc32; + svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total); + if (crc != scp->sc_input.sci_req.svp_crc32) { + (void) bunyan_info(svp_bunyan, "crc32 mismatch", + BUNYAN_T_IP, "remote ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "version", + ntohs(scp->sc_input.sci_req.svp_ver), + BUNYAN_T_INT32, "operation", nop, + BUNYAN_T_INT32, "response id", + ntohl(scp->sc_input.sci_req.svp_id), + BUNYAN_T_INT32, "query state", sqp->sq_state, + BUNYAN_T_UINT32, "msg_crc", ntohl(crc), + BUNYAN_T_UINT32, "calc_crc", + ntohl(scp->sc_input.sci_req.svp_crc32), + BUNYAN_T_END); + return (SVP_RA_ERROR); + } + scp->sc_input.sci_query = NULL; + scp->sc_input.sci_offset = 0; + + if (nop == SVP_R_VL2_ACK) { + svp_vl2_ack_t *sl2a = sqp->sq_wdata; + sqp->sq_status = ntohl(sl2a->sl2a_status); + } else if (nop == SVP_R_VL3_ACK) { + svp_vl3_ack_t *sl3a = sqp->sq_wdata; + sqp->sq_status = ntohl(sl3a->sl3a_status); + } else if (nop == SVP_R_LOG_ACK) { + svp_log_ack_t *svla = sqp->sq_wdata; + sqp->sq_status = ntohl(svla->svla_status); + } else if (nop == SVP_R_LOG_RM_ACK) { + svp_lrm_ack_t *svra = sqp->sq_wdata; + sqp->sq_status = ntohl(svra->svra_status); + } else { + libvarpd_panic("unhandled nop: %d", nop); + } + + list_remove(&scp->sc_queries, sqp); + mutex_exit(&scp->sc_lock); + + /* + * We have to release all of our resources associated with this entry + * before we call the callback. After we call it, the memory will be + * lost to time. + */ + svp_query_release(sqp); + sqp->sq_func(sqp, sqp->sq_arg); + mutex_enter(&scp->sc_lock); + scp->sc_event.se_events |= POLLIN | POLLRDNORM; + + return (SVP_RA_NONE); +} + +static svp_conn_act_t +svp_conn_reset(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + + assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) == + ENOENT); + if (close(scp->sc_socket) != 0) + libvarpd_panic("failed to close socket %d: %d", scp->sc_socket, + errno); + scp->sc_flags &= ~SVP_CF_TEARDOWN; + scp->sc_socket = -1; + scp->sc_cstate = SVP_CS_INITIAL; + scp->sc_input.sci_query = NULL; + scp->sc_output.sco_query = NULL; + + svp_remote_reassign(srp, scp); + + return (svp_conn_connect(scp)); +} + +/* + * This is our general state transition function. We're called here when we want + * to advance part of our state machine as well as to re-arm ourselves. We can + * also end up here from the standard event loop as a result of having a user + * event posted. + */ +static void +svp_conn_handler(port_event_t *pe, void *arg) +{ + svp_conn_t *scp = arg; + svp_remote_t *srp = scp->sc_remote; + svp_conn_act_t ret = SVP_RA_NONE; + svp_conn_state_t oldstate; + + mutex_enter(&scp->sc_lock); + + /* + * Check if one of our event interrupts is set. An event interrupt, such + * as having to be reaped or be torndown is notified by a + * PORT_SOURCE_USER event that tries to take care of this. However, + * because of the fact that the event loop can be ongoing despite this, + * we may get here before the PORT_SOURCE_USER has casued us to get + * here. In such a case, if the PORT_SOURCE_USER event is tagged, then + * we're going to opt to do nothing here and wait for it to come and + * tear us down. That will also indicate to us that we have nothing to + * worry about as far as general timing and the like goes. + */ + if ((scp->sc_flags & SVP_CF_UFLAG) != 0 && + (scp->sc_flags & SVP_CF_USER) != 0 && + pe != NULL && + pe->portev_source != PORT_SOURCE_USER) { + mutex_exit(&scp->sc_lock); + return; + } + + if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) { + scp->sc_flags &= ~SVP_CF_USER; + if ((scp->sc_flags & SVP_CF_UFLAG) == 0) { + mutex_exit(&scp->sc_lock); + return; + } + } + + /* Check if this needs to be freed */ + if (scp->sc_flags & SVP_CF_REAP) { + mutex_exit(&scp->sc_lock); + svp_conn_destroy(scp); + return; + } + + /* Check if this needs to be reset */ + if (scp->sc_flags & SVP_CF_TEARDOWN) { + /* Make sure any other users of this are disassociated */ + ret = SVP_RA_ERROR; + goto out; + } + + switch (scp->sc_cstate) { + case SVP_CS_INITIAL: + case SVP_CS_BACKOFF: + assert(pe == NULL); + ret = svp_conn_connect(scp); + break; + case SVP_CS_CONNECTING: + assert(pe != NULL); + ret = svp_conn_poll_connect(pe, scp); + break; + case SVP_CS_ACTIVE: + case SVP_CS_WINDDOWN: + assert(pe != NULL); + oldstate = scp->sc_cstate; + if (pe->portev_events & POLLOUT) + ret = svp_conn_pollout(scp); + if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN)) + ret = svp_conn_pollin(scp); + + if (oldstate == SVP_CS_WINDDOWN && + (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) { + ret = SVP_RA_CLEANUP; + } + + if (ret == SVP_RA_NONE) { + int err; + if ((err = svp_event_associate(&scp->sc_event, + scp->sc_socket)) != 0) { + scp->sc_error = SVP_CE_ASSOCIATE; + scp->sc_errno = err; + scp->sc_cstate = SVP_CS_ERROR; + ret = SVP_RA_DEGRADE; + } + } + break; + default: + libvarpd_panic("svp_conn_handler encountered unexpected " + "state: %d", scp->sc_cstate); + } +out: + mutex_exit(&scp->sc_lock); + + if (ret == SVP_RA_NONE) + return; + + mutex_enter(&srp->sr_lock); + mutex_enter(&scp->sc_lock); + if (ret == SVP_RA_ERROR) + ret = svp_conn_reset(scp); + + if (ret == SVP_RA_DEGRADE) + svp_conn_degrade(scp); + if (ret == SVP_RA_RESTORE) + svp_conn_restore(scp); + + if (ret == SVP_RA_CLEANUP) { + svp_conn_remove(scp); + scp->sc_flags |= SVP_CF_REAP; + svp_conn_inject(scp); + } + mutex_exit(&scp->sc_lock); + mutex_exit(&srp->sr_lock); +} + +static void +svp_conn_backtimer(void *arg) +{ + svp_conn_t *scp = arg; + + svp_conn_handler(NULL, scp); +} + +/* + * This fires every svp_conn_query_timeout seconds. Its purpos is to determine + * if we haven't heard back on a request with in svp_conn_query_timeout seconds. + * If any of the svp_conn_query_t's that have been started (indicated by + * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds + * have passed, we basically tear this connection down and reassign outstanding + * queries. + */ +static void +svp_conn_querytimer(void *arg) +{ + int ret; + svp_query_t *sqp; + svp_conn_t *scp = arg; + hrtime_t now = gethrtime(); + + mutex_enter(&scp->sc_lock); + + /* + * If we're not in the active state, then we don't care about this as + * we're already either going to die or we have no connections to worry + * about. + */ + if (scp->sc_cstate != SVP_CS_ACTIVE) { + mutex_exit(&scp->sc_lock); + return; + } + + for (sqp = list_head(&scp->sc_queries); sqp != NULL; + sqp = list_next(&scp->sc_queries, sqp)) { + if (sqp->sq_acttime == -1) + continue; + if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout) + break; + } + + /* Nothing timed out, we're good here */ + if (sqp == NULL) { + mutex_exit(&scp->sc_lock); + return; + } + + (void) bunyan_warn(svp_bunyan, "query timed out on connection", + BUNYAN_T_IP, "remote_ip", &scp->sc_addr, + BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, + BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op), + BUNYAN_T_END); + + /* + * Begin the tear down process for this connect. If we lose the + * disassociate, then we don't inject an event. See the big theory + * statement in libvarpd_svp.c for more information. + */ + scp->sc_flags |= SVP_CF_TEARDOWN; + + ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket); + if (ret == 0) + svp_conn_inject(scp); + else + VERIFY(ret == ENOENT); + + mutex_exit(&scp->sc_lock); +} + +/* + * This connection has fallen out of DNS, figure out what we need to do with it. + */ +void +svp_conn_fallout(svp_conn_t *scp) +{ + svp_remote_t *srp = scp->sc_remote; + + assert(MUTEX_HELD(&srp->sr_lock)); + + mutex_enter(&scp->sc_lock); + switch (scp->sc_cstate) { + case SVP_CS_ERROR: + /* + * Connection is already inactive, so it's safe to tear down. + * Fire it off through the state machine to tear down via the + * backoff timer. + */ + svp_conn_remove(scp); + scp->sc_flags |= SVP_CF_REAP; + svp_conn_inject(scp); + break; + case SVP_CS_INITIAL: + case SVP_CS_BACKOFF: + case SVP_CS_CONNECTING: + /* + * Here, we have something actively going on, so we'll let it be + * clean up the next time we hit the event loop by the event + * loop itself. As it has no connections, there isn't much to + * really do, though we'll take this chance to go ahead and + * remove it from the remote. + */ + svp_conn_remove(scp); + scp->sc_flags |= SVP_CF_REAP; + svp_conn_inject(scp); + break; + case SVP_CS_ACTIVE: + case SVP_CS_WINDDOWN: + /* + * If there are no outstanding queries, then we should simply + * clean this up now,t he same way we would with the others. + * Othewrise, as we know the event loop is ongoing, we'll make + * sure that these entries get cleaned up once they're done. + */ + scp->sc_cstate = SVP_CS_WINDDOWN; + if (list_is_empty(&scp->sc_queries)) { + svp_conn_remove(scp); + scp->sc_flags |= SVP_CF_REAP; + svp_conn_inject(scp); + } + break; + default: + libvarpd_panic("svp_conn_fallout encountered" + "unkonwn state"); + } + mutex_exit(&scp->sc_lock); +} + +int +svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr) +{ + int ret; + svp_conn_t *scp; + + assert(MUTEX_HELD(&srp->sr_lock)); + scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT); + if (scp == NULL) + return (ENOMEM); + + if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK, + NULL)) != 0) { + umem_free(scp, sizeof (svp_conn_t)); + return (ret); + } + + scp->sc_remote = srp; + scp->sc_event.se_func = svp_conn_handler; + scp->sc_event.se_arg = scp; + scp->sc_btimer.st_func = svp_conn_backtimer; + scp->sc_btimer.st_arg = scp; + scp->sc_btimer.st_oneshot = B_TRUE; + scp->sc_btimer.st_value = 1; + + scp->sc_qtimer.st_func = svp_conn_querytimer; + scp->sc_qtimer.st_arg = scp; + scp->sc_qtimer.st_oneshot = B_FALSE; + scp->sc_qtimer.st_value = svp_conn_query_timeout; + + scp->sc_socket = -1; + + list_create(&scp->sc_queries, sizeof (svp_query_t), + offsetof(svp_query_t, sq_lnode)); + scp->sc_gen = srp->sr_gen; + bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr)); + scp->sc_cstate = SVP_CS_INITIAL; + mutex_enter(&scp->sc_lock); + svp_conn_add(scp); + mutex_exit(&scp->sc_lock); + + /* Now that we're locked and loaded, add our timers */ + svp_timer_add(&scp->sc_qtimer); + svp_timer_add(&scp->sc_btimer); + + return (0); +} + +/* + * At the time of calling, the entry has been removed from all lists. In + * addition, the entries state should be SVP_CS_ERROR, therefore, we know that + * the fd should not be associated with the event loop. We'll double check that + * just in case. We should also have already been removed from the remote's + * list. + */ +void +svp_conn_destroy(svp_conn_t *scp) +{ + int ret; + + mutex_enter(&scp->sc_lock); + if (scp->sc_cstate != SVP_CS_ERROR) + libvarpd_panic("asked to tear down an active connection"); + if (scp->sc_flags & SVP_CF_ADDED) + libvarpd_panic("asked to remove a connection still in " + "the remote list\n"); + if (!list_is_empty(&scp->sc_queries)) + libvarpd_panic("asked to remove a connection with non-empty " + "query list"); + + if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) != + ENOENT) { + libvarpd_panic("dissociate failed or was actually " + "associated: %d", ret); + } + mutex_exit(&scp->sc_lock); + + /* Verify our timers are killed */ + svp_timer_remove(&scp->sc_btimer); + svp_timer_remove(&scp->sc_qtimer); + + if (scp->sc_socket != -1 && close(scp->sc_socket) != 0) + libvarpd_panic("failed to close svp_conn_t`scp_socket fd " + "%d: %d", scp->sc_socket, errno); + + list_destroy(&scp->sc_queries); + umem_free(scp, sizeof (svp_conn_t)); +} + +void +svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp) +{ + assert(MUTEX_HELD(&scp->sc_lock)); + assert(scp->sc_cstate == SVP_CS_ACTIVE); + + sqp->sq_acttime = -1; + list_insert_tail(&scp->sc_queries, sqp); + if (!(scp->sc_event.se_events & POLLOUT)) { + scp->sc_event.se_events |= POLLOUT; + /* + * If this becomes frequent, we should instead give up on this + * set of connections instead of aborting. + */ + if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0) + libvarpd_panic("svp_event_associate failed somehow"); + } +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_crc.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_crc.c new file mode 100644 index 0000000000..ade47ff998 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_crc.c @@ -0,0 +1,53 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015, Joyent, Inc. + */ + +/* + * Perform standard crc32 functions. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + * + * Really, this should just be a libcrc. + */ + +#include <sys/crc32.h> +#include <stdint.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <inttypes.h> +#include <libvarpd_svp.h> + +static uint32_t svp_crc32_tab[] = { CRC32_TABLE }; + +static uint32_t +svp_crc32(uint32_t old, const uint8_t *buf, size_t len) +{ + uint32_t out; + + CRC32(out, buf, len, old, svp_crc32_tab); + return (out); +} + +void +svp_query_crc32(svp_req_t *shp, void *buf, size_t data) +{ + uint32_t crc = -1U; + + shp->svp_crc32 = 0; + crc = svp_crc32(crc, (uint8_t *)shp, sizeof (svp_req_t)); + crc = svp_crc32(crc, buf, data); + crc = ~crc; + shp->svp_crc32 = htonl(crc); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_host.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_host.c new file mode 100644 index 0000000000..e91cc30e9d --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_host.c @@ -0,0 +1,171 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * DNS Host-name related functions. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +#include <sys/socket.h> +#include <netdb.h> +#include <thread.h> +#include <synch.h> +#include <assert.h> +#include <errno.h> + +#include <libvarpd_svp.h> + +int svp_host_nthreads = 8; + +static mutex_t svp_host_lock = ERRORCHECKMUTEX; +static cond_t svp_host_cv = DEFAULTCV; +static svp_remote_t *svp_host_head; + +/* ARGSUSED */ +static void * +svp_host_loop(void *unused) +{ + for (;;) { + int err; + svp_remote_t *srp; + struct addrinfo *addrs; + + mutex_enter(&svp_host_lock); + while (svp_host_head == NULL) + (void) cond_wait(&svp_host_cv, &svp_host_lock); + srp = svp_host_head; + svp_host_head = srp->sr_nexthost; + if (svp_host_head != NULL) + (void) cond_signal(&svp_host_cv); + mutex_exit(&svp_host_lock); + + mutex_enter(&srp->sr_lock); + assert(srp->sr_state & SVP_RS_LOOKUP_SCHEDULED); + srp->sr_state &= ~SVP_RS_LOOKUP_SCHEDULED; + if (srp->sr_state & SVP_RS_LOOKUP_INPROGRESS) { + mutex_exit(&srp->sr_lock); + continue; + } + srp->sr_state |= SVP_RS_LOOKUP_INPROGRESS; + mutex_exit(&srp->sr_lock); + + for (;;) { + err = getaddrinfo(srp->sr_hostname, NULL, NULL, &addrs); + if (err == 0) + break; + if (err != 0) { + switch (err) { + case EAI_ADDRFAMILY: + case EAI_BADFLAGS: + case EAI_FAMILY: + case EAI_SERVICE: + case EAI_SOCKTYPE: + case EAI_OVERFLOW: + default: + libvarpd_panic("unexpected getaddrinfo " + "failure: %d", err); + break; + case EAI_AGAIN: + case EAI_MEMORY: + case EAI_SYSTEM: + continue; + case EAI_FAIL: + case EAI_NODATA: + case EAI_NONAME: + /* + * At this point in time we have + * something which isn't very good. This + * may have been a typo or something may + * have been destroyed. We should go + * ahead and degrade this overall + * instance, because we're not going to + * make much forward progress... It'd be + * great if we could actually issue more + * of an EREPORT to describe what + * happened, some day. + */ + mutex_enter(&srp->sr_lock); + svp_remote_degrade(srp, + SVP_RD_DNS_FAIL); + mutex_exit(&srp->sr_lock); + break; + } + } + break; + } + + if (err == 0) { + /* + * We've successfully resolved something, mark this + * degredation over for now. + */ + mutex_enter(&srp->sr_lock); + svp_remote_restore(srp, SVP_RD_DNS_FAIL); + mutex_exit(&srp->sr_lock); + svp_remote_resolved(srp, addrs); + } + + mutex_enter(&srp->sr_lock); + srp->sr_state &= ~SVP_RS_LOOKUP_INPROGRESS; + (void) cond_broadcast(&srp->sr_cond); + mutex_exit(&srp->sr_lock); + } + + /* LINTED: E_STMT_NOT_REACHED */ + return (NULL); +} + +void +svp_host_queue(svp_remote_t *srp) +{ + svp_remote_t *s; + mutex_enter(&svp_host_lock); + mutex_enter(&srp->sr_lock); + if (srp->sr_state & SVP_RS_LOOKUP_SCHEDULED) { + mutex_exit(&srp->sr_lock); + mutex_exit(&svp_host_lock); + return; + } + srp->sr_state |= SVP_RS_LOOKUP_SCHEDULED; + s = svp_host_head; + while (s != NULL && s->sr_nexthost != NULL) + s = s->sr_nexthost; + if (s == NULL) { + assert(s == svp_host_head); + svp_host_head = srp; + } else { + s->sr_nexthost = srp; + } + srp->sr_nexthost = NULL; + (void) cond_signal(&svp_host_cv); + mutex_exit(&srp->sr_lock); + mutex_exit(&svp_host_lock); +} + +int +svp_host_init(void) +{ + int i; + + for (i = 0; i < svp_host_nthreads; i++) { + if (thr_create(NULL, 0, svp_host_loop, NULL, + THR_DETACHED | THR_DAEMON, NULL) != 0) + return (errno); + } + + return (0); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_loop.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_loop.c new file mode 100644 index 0000000000..18a79b9dff --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_loop.c @@ -0,0 +1,210 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * Event loop mechanism for our backend. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +#include <unistd.h> +#include <thread.h> +#include <port.h> +#include <signal.h> +#include <time.h> +#include <errno.h> +#include <umem.h> + +#include <libvarpd_svp.h> + +typedef struct svp_event_loop { + int sel_port; /* RO */ + int sel_nthread; /* RO */ + thread_t *sel_threads; /* RO */ + boolean_t sel_stop; /* svp_elock */ + timer_t sel_hosttimer; +} svp_event_loop_t; + +static svp_event_loop_t svp_event; +static mutex_t svp_elock = ERRORCHECKMUTEX; + +/* ARGSUSED */ +static void * +svp_event_thr(void *arg) +{ + for (;;) { + int ret; + port_event_t pe; + svp_event_t *sep; + + mutex_enter(&svp_elock); + if (svp_event.sel_stop == B_TRUE) { + mutex_exit(&svp_elock); + break; + } + mutex_exit(&svp_elock); + + ret = port_get(svp_event.sel_port, &pe, NULL); + if (ret != 0) { + switch (errno) { + case EFAULT: + case EBADF: + case EINVAL: + libvarpd_panic("unexpected port_get errno: %d", + errno); + default: + break; + } + } + + if (pe.portev_user == NULL) + libvarpd_panic("received event (%p) without " + "protev_user set", &pe); + sep = (svp_event_t *)pe.portev_user; + sep->se_func(&pe, sep->se_arg); + } + + return (NULL); +} + +int +svp_event_associate(svp_event_t *sep, int fd) +{ + int ret; + + ret = port_associate(svp_event.sel_port, PORT_SOURCE_FD, fd, + sep->se_events, sep); + if (ret != 0) { + switch (errno) { + case EBADF: + case EBADFD: + case EINVAL: + case EAGAIN: + libvarpd_panic("unexpected port_associate error: %d", + errno); + default: + ret = errno; + break; + } + } + + return (ret); +} + +/* ARGSUSED */ +int +svp_event_dissociate(svp_event_t *sep, int fd) +{ + int ret; + + ret = port_dissociate(svp_event.sel_port, PORT_SOURCE_FD, fd); + if (ret != 0) { + if (errno != ENOENT) + libvarpd_panic("unexpected port_dissociate error: %d", + errno); + ret = errno; + } + return (ret); +} + +int +svp_event_inject(svp_event_t *user) +{ + return (port_send(svp_event.sel_port, 0, user)); +} + +int +svp_event_timer_init(svp_event_t *sep) +{ + port_notify_t pn; + struct sigevent evp; + struct itimerspec ts; + + pn.portnfy_port = svp_event.sel_port; + pn.portnfy_user = sep; + evp.sigev_notify = SIGEV_PORT; + evp.sigev_value.sival_ptr = &pn; + + if (timer_create(CLOCK_REALTIME, &evp, &svp_event.sel_hosttimer) != 0) + return (errno); + + ts.it_value.tv_sec = svp_tickrate; + ts.it_value.tv_nsec = 0; + ts.it_interval.tv_sec = svp_tickrate; + ts.it_interval.tv_nsec = 0; + + if (timer_settime(svp_event.sel_hosttimer, TIMER_RELTIME, &ts, + NULL) != 0) { + int ret = errno; + (void) timer_delete(svp_event.sel_hosttimer); + return (ret); + } + + return (0); +} + +int +svp_event_init(void) +{ + long i, ncpus; + + svp_event.sel_port = port_create(); + if (svp_event.sel_port == -1) + return (errno); + + ncpus = sysconf(_SC_NPROCESSORS_ONLN) * 2 + 1; + if (ncpus <= 0) + libvarpd_panic("sysconf for nprocs failed... %d/%d", + ncpus, errno); + + svp_event.sel_threads = umem_alloc(sizeof (thread_t) * ncpus, + UMEM_DEFAULT); + if (svp_event.sel_threads == NULL) { + int ret = errno; + (void) timer_delete(svp_event.sel_hosttimer); + (void) close(svp_event.sel_port); + svp_event.sel_port = -1; + return (ret); + } + + for (i = 0; i < ncpus; i++) { + int ret; + thread_t *thr = &svp_event.sel_threads[i]; + + ret = thr_create(NULL, 0, svp_event_thr, NULL, + THR_DETACHED | THR_DAEMON, thr); + if (ret != 0) { + ret = errno; + (void) timer_delete(svp_event.sel_hosttimer); + (void) close(svp_event.sel_port); + svp_event.sel_port = -1; + return (errno); + } + } + + return (0); +} + +void +svp_event_fini(void) +{ + mutex_enter(&svp_elock); + svp_event.sel_stop = B_TRUE; + mutex_exit(&svp_elock); + + (void) timer_delete(svp_event.sel_hosttimer); + (void) close(svp_event.sel_port); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_prot.h b/usr/src/lib/varpd/svp/common/libvarpd_svp_prot.h new file mode 100644 index 0000000000..16dbdbec05 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_prot.h @@ -0,0 +1,236 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +#ifndef _LIBVARPD_SVP_PROT_H +#define _LIBVARPD_SVP_PROT_H + +/* + * SVP protocol Definitions + */ + +#include <sys/types.h> +#include <inttypes.h> +#include <sys/ethernet.h> +#include <netinet/in.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/* + * SDC VXLAN Protocol Definitions + */ + +#define SVP_VERSION_ONE 1 +#define SVP_CURRENT_VERSION SVP_VERSION_ONE + +typedef struct svp_req { + uint16_t svp_ver; + uint16_t svp_op; + uint32_t svp_size; + uint32_t svp_id; + uint32_t svp_crc32; +} svp_req_t; + +typedef enum svp_op { + SVP_R_UNKNOWN = 0x00, + SVP_R_PING = 0x01, + SVP_R_PONG = 0x02, + SVP_R_VL2_REQ = 0x03, + SVP_R_VL2_ACK = 0x04, + SVP_R_VL3_REQ = 0x05, + SVP_R_VL3_ACK = 0x06, + SVP_R_BULK_REQ = 0x07, + SVP_R_BULK_ACK = 0x08, + SVP_R_LOG_REQ = 0x09, + SVP_R_LOG_ACK = 0x0A, + SVP_R_LOG_RM = 0x0B, + SVP_R_LOG_RM_ACK = 0x0C, + SVP_R_SHOOTDOWN = 0x0D +} svp_op_t; + +typedef enum svp_status { + SVP_S_OK = 0x00, /* Everything OK */ + SVP_S_FATAL = 0x01, /* Fatal error, close connection */ + SVP_S_NOTFOUND = 0x02, /* Entry not found */ + SVP_S_BADL3TYPE = 0x03, /* Unknown svp_vl3_type_t */ + SVP_S_BADBULK = 0x04 /* Unknown svp_bulk_type_t */ +} svp_status_t; + +/* + * A client issues the SVP_R_VL2_REQ whenever it needs to perform a VLS->UL3 + * lookup. Requests have the following structure: + */ +typedef struct svp_vl2_req { + uint8_t sl2r_mac[ETHERADDRL]; + uint8_t sl2r_pad[2]; + uint32_t sl2r_vnetid; +} svp_vl2_req_t; + +/* + * This is the message a server uses to reply to the SVP_R_VL2_REQ. If the + * destination on the underlay is an IPv4 address, it should be encoded as an + * IPv4-mapped IPv6 address. + */ +typedef struct svp_vl2_ack { + uint16_t sl2a_status; + uint16_t sl2a_port; + uint8_t sl2a_addr[16]; +} svp_vl2_ack_t; + + +/* + * A client issues the SVP_R_VL3_REQ request whenever it needs to perform a + * VL3->VL2 lookup. Note, that this also implicitly performs a VL2->UL3 lookup + * as well. The sl3r_type member is used to indicate the kind of lookup type + * that we're performing, eg. is it a L3 or L2. + */ +typedef enum svp_vl3_type { + SVP_VL3_IP = 0x01, + SVP_VL3_IPV6 = 0x02 +} svp_vl3_type_t; + +typedef struct svp_vl3_req { + uint8_t sl3r_ip[16]; + uint32_t sl3r_type; + uint32_t sl3r_vnetid; +} svp_vl3_req_t; + +/* + * This response, corresponding to the SVP_R_VL3_ACK, includes an answer to both + * the VL3->VL2 and the VL2->UL3 requests. + */ +typedef struct svp_vl3_ack { + uint32_t sl3a_status; + uint8_t sl3a_mac[ETHERADDRL]; + uint16_t sl3a_uport; + uint8_t sl3a_uip[16]; +} svp_vl3_ack_t; + +/* + * SVP_R_BULK_REQ requests a bulk dump of data. Currently we have two kinds of + * data tables that we need to dump: VL3->VL2 mappings and VL2->UL3 mappings. + * The kind that we want is indicated using the svbr_type member. + */ +typedef enum svp_bulk_type { + SVP_BULK_VL2 = 0x01, + SVP_BULK_VL3 = 0x02 +} svp_bulk_type_t; + +typedef struct svp_bulk_req { + uint32_t svbr_type; +} svp_bulk_req_t; + +/* + * When replying to a bulk request (SVP_R_BULK_ACK), data is streamed back + * across. The format of the data is currently undefined and as we work on the + * system, we'll get a better understanding of what this should look like. A + * client may need to stream such a request to disk, or the format will need to + * be in a streamable format that allows the client to construct data. + */ +typedef struct svp_bulk_ack { + uint32_t svba_status; + uint32_t svba_type; + uint8_t svba_data[]; +} svp_bulk_ack_t; + +/* + * SVP_R_LOG_REQ requests a log entries from the specified log from the server. + * The total number of bytes that the user is ready to receive is in svlr_count. + * However, the server should not block for data if none is available and thus + * may return less than svlr_count bytes back. We identify the IP address of the + * underlay to use here explicitly. + */ +typedef struct svp_log_req { + uint32_t svlr_count; + uint8_t svlr_ip[16]; +} svp_log_req_t; + +/* + * The server replies to a log request by sending a series of log entries. + * These log entries may be a mixture of both vl2 and vl3 records. The reply is + * a stream of bytes after the status message whose length is determined baseed + * on the header itself. Each entry begins with a uint32_t that describes its + * type and then is followed by the remaining data payload. The next entry + * follows immediately which again begins with the uint32_t word that describes + * what it should be. + */ +typedef enum svp_log_type { + SVP_LOG_VL2 = 0x01, + SVP_LOG_VL3 = 0x02 +} svp_log_type_t; + +typedef struct svp_log_vl2 { + uint32_t svl2_type; /* Should be SVP_LOG_VL2 */ + uint8_t svl2_id[16]; /* 16-byte UUID */ + uint8_t svl2_mac[ETHERADDRL]; + uint8_t svl2_pad[2]; + uint32_t svl2_vnetid; +} svp_log_vl2_t; + +typedef struct svp_log_vl3 { + uint32_t svl3_type; /* Should be SVP_LOG_VL3 */ + uint8_t svl3_id[16]; /* 16-byte UUID */ + uint8_t svl3_ip[16]; + uint8_t svl3_pad[2]; + uint16_t svl3_vlan; + uint32_t svl3_vnetid; +} svp_log_vl3_t; + +typedef struct svp_log_ack { + uint32_t svla_status; + uint8_t svla_data[]; +} svp_log_ack_t; + +/* + * SVP_R_LOG_RM is used after the client successfully processes a series of the + * log stream. It replies to tell the server that it can remove those IDs from + * processing. The IDs used are the same IDs that were in the individual + * SVP_R_LOG_ACK entries. + */ +typedef struct svp_lrm_req { + uint32_t svrr_count; + uint8_t svrr_ids[]; +} svp_lrm_req_t; + +/* + * SVP_R_LOG_RM_ACK is used to indicate that a log entry has been successfully + * deleted and at this point it makes sense to go and ask for another + * SVP_R_LOG_REQ. + */ +typedef struct svp_lrm_ack { + uint32_t svra_status; +} svp_lrm_ack_t; + +/* + * A shootdown (SVP_R_SHOOTDOWN) is used by a CN to reply to another CN that it + * sent an invalid entry that could not be processed. This should be a + * relatively infrequent occurrence. Unlike the rest of the messages, there is + * no reply to it. It's a single request to try and help get us out there. When + * a node receives this, it will issue a conditional revocation ioctl, that + * removes the entry if and only if, it matches the IP. That way if we've + * already gotten an updated entry for this, we don't remove it again. + */ +typedef struct svp_shootdown { + uint8_t svsd_mac[ETHERADDRL]; + uint8_t svsd_pad[2]; + uint32_t svsd_vnetid; +} svp_shootdown_t; + +#ifdef __cplusplus +} +#endif + +#endif /* _LIBVARPD_SVP_PROT_H */ diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_remote.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_remote.c new file mode 100644 index 0000000000..99775f93c0 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_remote.c @@ -0,0 +1,821 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2019 Joyent, Inc. + */ + +/* + * Remote backend management + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +#include <umem.h> +#include <strings.h> +#include <string.h> +#include <stddef.h> +#include <thread.h> +#include <synch.h> +#include <assert.h> +#include <sys/socket.h> +#include <netdb.h> +#include <errno.h> +#include <libidspace.h> + +#include <libvarpd_provider.h> +#include <libvarpd_svp.h> + +typedef struct svp_shoot_vl3 { + svp_query_t ssv_query; + struct sockaddr_in6 ssv_sock; + svp_log_vl3_t *ssv_vl3; + svp_sdlog_t *ssv_log; +} svp_shoot_vl3_t; + +static mutex_t svp_remote_lock = ERRORCHECKMUTEX; +static avl_tree_t svp_remote_tree; +static svp_timer_t svp_dns_timer; +static id_space_t *svp_idspace; +static int svp_dns_timer_rate = 30; /* seconds */ + +static void +svp_remote_mkfmamsg(svp_remote_t *srp, svp_degrade_state_t state, char *buf, + size_t buflen) +{ + switch (state) { + case SVP_RD_DNS_FAIL: + (void) snprintf(buf, buflen, "failed to resolve or find " + "entries for hostname %s", srp->sr_hostname); + break; + case SVP_RD_REMOTE_FAIL: + (void) snprintf(buf, buflen, "cannot reach any remote peers"); + break; + default: + (void) snprintf(buf, buflen, "unkonwn error state: %d", state); + } +} + +static int +svp_remote_comparator(const void *l, const void *r) +{ + int ret; + const svp_remote_t *lr = l, *rr = r; + + ret = strcmp(lr->sr_hostname, rr->sr_hostname); + if (ret > 0) + return (1); + else if (ret < 0) + return (-1); + + if (lr->sr_rport > rr->sr_rport) + return (1); + else if (lr->sr_rport < rr->sr_rport) + return (-1); + + return (memcmp(&lr->sr_uip, &rr->sr_uip, sizeof (struct in6_addr))); +} + +void +svp_query_release(svp_query_t *sqp) +{ + id_free(svp_idspace, sqp->sq_header.svp_id); +} + +static void +svp_remote_destroy(svp_remote_t *srp) +{ + size_t len; + + /* + * Clean up any unrelated DNS information. At this point we know that + * we're not in the remote tree. That means, that svp_remote_dns_timer + * cannot queue us. However, if any of our DNS related state flags are + * set, we have to hang out. + */ + mutex_enter(&srp->sr_lock); + while (srp->sr_state & + (SVP_RS_LOOKUP_SCHEDULED | SVP_RS_LOOKUP_INPROGRESS)) { + (void) cond_wait(&srp->sr_cond, &srp->sr_lock); + } + mutex_exit(&srp->sr_lock); + svp_shootdown_fini(srp); + + if (cond_destroy(&srp->sr_cond) != 0) + libvarpd_panic("failed to destroy cond sr_cond"); + + if (mutex_destroy(&srp->sr_lock) != 0) + libvarpd_panic("failed to destroy mutex sr_lock"); + + if (srp->sr_addrinfo != NULL) + freeaddrinfo(srp->sr_addrinfo); + len = strlen(srp->sr_hostname) + 1; + umem_free(srp->sr_hostname, len); + umem_free(srp, sizeof (svp_remote_t)); +} + +static int +svp_remote_create(const char *host, uint16_t port, struct in6_addr *uip, + svp_remote_t **outp) +{ + size_t hlen; + svp_remote_t *remote; + + assert(MUTEX_HELD(&svp_remote_lock)); + + remote = umem_zalloc(sizeof (svp_remote_t), UMEM_DEFAULT); + if (remote == NULL) { + mutex_exit(&svp_remote_lock); + return (ENOMEM); + } + + if (svp_shootdown_init(remote) != 0) { + umem_free(remote, sizeof (svp_remote_t)); + mutex_exit(&svp_remote_lock); + return (ENOMEM); + } + + hlen = strlen(host) + 1; + remote->sr_hostname = umem_alloc(hlen, UMEM_DEFAULT); + if (remote->sr_hostname == NULL) { + svp_shootdown_fini(remote); + umem_free(remote, sizeof (svp_remote_t)); + mutex_exit(&svp_remote_lock); + return (ENOMEM); + } + remote->sr_rport = port; + if (mutex_init(&remote->sr_lock, + USYNC_THREAD | LOCK_ERRORCHECK, NULL) != 0) + libvarpd_panic("failed to create mutex sr_lock"); + if (cond_init(&remote->sr_cond, USYNC_PROCESS, NULL) != 0) + libvarpd_panic("failed to create cond sr_cond"); + list_create(&remote->sr_conns, sizeof (svp_conn_t), + offsetof(svp_conn_t, sc_rlist)); + avl_create(&remote->sr_tree, svp_comparator, sizeof (svp_t), + offsetof(svp_t, svp_rlink)); + (void) strlcpy(remote->sr_hostname, host, hlen); + remote->sr_count = 1; + remote->sr_uip = *uip; + + svp_shootdown_start(remote); + + *outp = remote; + return (0); +} + +int +svp_remote_find(char *host, uint16_t port, struct in6_addr *uip, + svp_remote_t **outp) +{ + int ret; + svp_remote_t lookup, *remote; + + lookup.sr_hostname = host; + lookup.sr_rport = port; + lookup.sr_uip = *uip; + mutex_enter(&svp_remote_lock); + remote = avl_find(&svp_remote_tree, &lookup, NULL); + if (remote != NULL) { + assert(remote->sr_count > 0); + remote->sr_count++; + *outp = remote; + mutex_exit(&svp_remote_lock); + return (0); + } + + if ((ret = svp_remote_create(host, port, uip, outp)) != 0) { + mutex_exit(&svp_remote_lock); + return (ret); + } + + avl_add(&svp_remote_tree, *outp); + mutex_exit(&svp_remote_lock); + + /* Make sure DNS is up to date */ + svp_host_queue(*outp); + + return (0); +} + +void +svp_remote_release(svp_remote_t *srp) +{ + mutex_enter(&svp_remote_lock); + mutex_enter(&srp->sr_lock); + srp->sr_count--; + if (srp->sr_count != 0) { + mutex_exit(&srp->sr_lock); + mutex_exit(&svp_remote_lock); + return; + } + mutex_exit(&srp->sr_lock); + + avl_remove(&svp_remote_tree, srp); + mutex_exit(&svp_remote_lock); + svp_remote_destroy(srp); +} + +int +svp_remote_attach(svp_remote_t *srp, svp_t *svp) +{ + svp_t check; + avl_index_t where; + + mutex_enter(&srp->sr_lock); + if (svp->svp_remote != NULL) + libvarpd_panic("failed to create mutex sr_lock"); + + /* + * We require everything except shootdowns + */ + if (svp->svp_cb.scb_vl2_lookup == NULL) + libvarpd_panic("missing callback scb_vl2_lookup"); + if (svp->svp_cb.scb_vl3_lookup == NULL) + libvarpd_panic("missing callback scb_vl3_lookup"); + if (svp->svp_cb.scb_vl2_invalidate == NULL) + libvarpd_panic("missing callback scb_vl2_invalidate"); + if (svp->svp_cb.scb_vl3_inject == NULL) + libvarpd_panic("missing callback scb_vl3_inject"); + + check.svp_vid = svp->svp_vid; + if (avl_find(&srp->sr_tree, &check, &where) != NULL) + libvarpd_panic("found duplicate entry with vid %ld", + svp->svp_vid); + avl_insert(&srp->sr_tree, svp, where); + svp->svp_remote = srp; + mutex_exit(&srp->sr_lock); + + return (0); +} + +void +svp_remote_detach(svp_t *svp) +{ + svp_t *lookup; + svp_remote_t *srp = svp->svp_remote; + + if (srp == NULL) + libvarpd_panic("trying to detach remote when none exists"); + + mutex_enter(&srp->sr_lock); + lookup = avl_find(&srp->sr_tree, svp, NULL); + if (lookup == NULL || lookup != svp) + libvarpd_panic("inconsitent remote avl tree..."); + avl_remove(&srp->sr_tree, svp); + svp->svp_remote = NULL; + mutex_exit(&srp->sr_lock); + svp_remote_release(srp); +} + +/* + * Walk the list of connections and find the first one that's available, the + * move it to the back of the list so it's less likely to be used again. + */ +static boolean_t +svp_remote_conn_queue(svp_remote_t *srp, svp_query_t *sqp) +{ + svp_conn_t *scp; + + assert(MUTEX_HELD(&srp->sr_lock)); + for (scp = list_head(&srp->sr_conns); scp != NULL; + scp = list_next(&srp->sr_conns, scp)) { + mutex_enter(&scp->sc_lock); + if (scp->sc_cstate != SVP_CS_ACTIVE) { + mutex_exit(&scp->sc_lock); + continue; + } + svp_conn_queue(scp, sqp); + mutex_exit(&scp->sc_lock); + list_remove(&srp->sr_conns, scp); + list_insert_tail(&srp->sr_conns, scp); + return (B_TRUE); + } + + return (B_FALSE); +} + +static void +svp_remote_vl2_lookup_cb(svp_query_t *sqp, void *arg) +{ + svp_t *svp = sqp->sq_svp; + svp_vl2_ack_t *vl2a = (svp_vl2_ack_t *)sqp->sq_wdata; + + if (sqp->sq_status == SVP_S_OK) + svp->svp_cb.scb_vl2_lookup(svp, sqp->sq_status, + (struct in6_addr *)vl2a->sl2a_addr, ntohs(vl2a->sl2a_port), + arg); + else + svp->svp_cb.scb_vl2_lookup(svp, sqp->sq_status, NULL, 0, arg); +} + +void +svp_remote_vl2_lookup(svp_t *svp, svp_query_t *sqp, const uint8_t *mac, + void *arg) +{ + svp_remote_t *srp; + svp_vl2_req_t *vl2r = &sqp->sq_rdun.sqd_vl2r; + + srp = svp->svp_remote; + sqp->sq_func = svp_remote_vl2_lookup_cb; + sqp->sq_arg = arg; + sqp->sq_svp = svp; + sqp->sq_state = SVP_QUERY_INIT; + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION); + sqp->sq_header.svp_op = htons(SVP_R_VL2_REQ); + sqp->sq_header.svp_size = htonl(sizeof (svp_vl2_req_t)); + sqp->sq_header.svp_id = id_alloc(svp_idspace); + if (sqp->sq_header.svp_id == (id_t)-1) + libvarpd_panic("failed to allcoate from svp_idspace: %d", + errno); + sqp->sq_header.svp_crc32 = htonl(0); + sqp->sq_rdata = vl2r; + sqp->sq_rsize = sizeof (svp_vl2_req_t); + sqp->sq_wdata = NULL; + sqp->sq_wsize = 0; + + bcopy(mac, vl2r->sl2r_mac, ETHERADDRL); + vl2r->sl2r_vnetid = ntohl(svp->svp_vid); + + mutex_enter(&srp->sr_lock); + if (svp_remote_conn_queue(srp, sqp) == B_FALSE) + svp->svp_cb.scb_vl2_lookup(svp, SVP_S_FATAL, NULL, 0, arg); + mutex_exit(&srp->sr_lock); +} + +static void +svp_remote_vl3_lookup_cb(svp_query_t *sqp, void *arg) +{ + svp_t *svp = sqp->sq_svp; + svp_vl3_ack_t *vl3a = (svp_vl3_ack_t *)sqp->sq_wdata; + + if (sqp->sq_status == SVP_S_OK) + svp->svp_cb.scb_vl3_lookup(svp, sqp->sq_status, vl3a->sl3a_mac, + (struct in6_addr *)vl3a->sl3a_uip, ntohs(vl3a->sl3a_uport), + arg); + else + svp->svp_cb.scb_vl3_lookup(svp, sqp->sq_status, NULL, NULL, 0, + arg); +} + +static void +svp_remote_vl3_common(svp_remote_t *srp, svp_query_t *sqp, + const struct sockaddr *addr, svp_query_f func, void *arg, uint32_t vid) +{ + svp_vl3_req_t *vl3r = &sqp->sq_rdun.sdq_vl3r; + + if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6) + libvarpd_panic("unexpected sa_family for the vl3 lookup"); + + sqp->sq_func = func; + sqp->sq_arg = arg; + sqp->sq_state = SVP_QUERY_INIT; + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION); + sqp->sq_header.svp_op = htons(SVP_R_VL3_REQ); + sqp->sq_header.svp_size = htonl(sizeof (svp_vl3_req_t)); + sqp->sq_header.svp_id = id_alloc(svp_idspace); + if (sqp->sq_header.svp_id == (id_t)-1) + libvarpd_panic("failed to allcoate from svp_idspace: %d", + errno); + sqp->sq_header.svp_crc32 = htonl(0); + sqp->sq_rdata = vl3r; + sqp->sq_rsize = sizeof (svp_vl3_req_t); + sqp->sq_wdata = NULL; + sqp->sq_wsize = 0; + + if (addr->sa_family == AF_INET6) { + struct sockaddr_in6 *s6 = (struct sockaddr_in6 *)addr; + vl3r->sl3r_type = htonl(SVP_VL3_IPV6); + bcopy(&s6->sin6_addr, vl3r->sl3r_ip, + sizeof (struct in6_addr)); + } else { + struct sockaddr_in *s4 = (struct sockaddr_in *)addr; + struct in6_addr v6; + + vl3r->sl3r_type = htonl(SVP_VL3_IP); + IN6_INADDR_TO_V4MAPPED(&s4->sin_addr, &v6); + bcopy(&v6, vl3r->sl3r_ip, sizeof (struct in6_addr)); + } + vl3r->sl3r_vnetid = htonl(vid); + + mutex_enter(&srp->sr_lock); + if (svp_remote_conn_queue(srp, sqp) == B_FALSE) { + sqp->sq_status = SVP_S_FATAL; + sqp->sq_func(sqp, arg); + } + mutex_exit(&srp->sr_lock); +} + +/* + * This is a request to do a VL3 look-up that originated internally as opposed + * to coming from varpd. As such we need a slightly different query callback + * function upon completion and don't go through the normal path with the svp_t. + */ +void +svp_remote_vl3_logreq(svp_remote_t *srp, svp_query_t *sqp, uint32_t vid, + const struct sockaddr *addr, svp_query_f func, void *arg) +{ + svp_remote_vl3_common(srp, sqp, addr, func, arg, vid); +} + +void +svp_remote_vl3_lookup(svp_t *svp, svp_query_t *sqp, + const struct sockaddr *addr, void *arg) +{ + svp_remote_t *srp = svp->svp_remote; + + sqp->sq_svp = svp; + svp_remote_vl3_common(srp, sqp, addr, svp_remote_vl3_lookup_cb, + arg, svp->svp_vid); +} + +static void +svp_remote_log_request_cb(svp_query_t *sqp, void *arg) +{ + svp_remote_t *srp = sqp->sq_arg; + + assert(sqp->sq_wdata != NULL); + if (sqp->sq_status == SVP_S_OK) + svp_shootdown_logr_cb(srp, sqp->sq_status, sqp->sq_wdata, + sqp->sq_size); + else + svp_shootdown_logr_cb(srp, sqp->sq_status, NULL, 0); +} + +void +svp_remote_log_request(svp_remote_t *srp, svp_query_t *sqp, void *buf, + size_t buflen) +{ + svp_log_req_t *logr = &sqp->sq_rdun.sdq_logr; + boolean_t queued; + + sqp->sq_func = svp_remote_log_request_cb; + sqp->sq_state = SVP_QUERY_INIT; + sqp->sq_arg = srp; + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION); + sqp->sq_header.svp_op = htons(SVP_R_LOG_REQ); + sqp->sq_header.svp_size = htonl(sizeof (svp_log_req_t)); + sqp->sq_header.svp_id = id_alloc(svp_idspace); + if (sqp->sq_header.svp_id == (id_t)-1) + libvarpd_panic("failed to allcoate from svp_idspace: %d", + errno); + sqp->sq_header.svp_crc32 = htonl(0); + sqp->sq_rdata = logr; + sqp->sq_rsize = sizeof (svp_log_req_t); + sqp->sq_wdata = buf; + sqp->sq_wsize = buflen; + + logr->svlr_count = htonl(buflen); + bcopy(&srp->sr_uip, logr->svlr_ip, sizeof (struct in6_addr)); + + /* + * If this fails, there isn't much that we can't do. Give the callback + * with a fatal status. + */ + mutex_enter(&srp->sr_lock); + queued = svp_remote_conn_queue(srp, sqp); + mutex_exit(&srp->sr_lock); + + if (queued == B_FALSE) + svp_shootdown_logr_cb(srp, SVP_S_FATAL, NULL, 0); +} + +static void +svp_remote_lrm_request_cb(svp_query_t *sqp, void *arg) +{ + svp_remote_t *srp = arg; + + svp_shootdown_lrm_cb(srp, sqp->sq_status); +} + +void +svp_remote_lrm_request(svp_remote_t *srp, svp_query_t *sqp, void *buf, + size_t buflen) +{ + boolean_t queued; + svp_lrm_req_t *svrr = buf; + + sqp->sq_func = svp_remote_lrm_request_cb; + sqp->sq_state = SVP_QUERY_INIT; + sqp->sq_arg = srp; + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION); + sqp->sq_header.svp_op = htons(SVP_R_LOG_RM); + sqp->sq_header.svp_size = htonl(buflen); + sqp->sq_header.svp_id = id_alloc(svp_idspace); + if (sqp->sq_header.svp_id == (id_t)-1) + libvarpd_panic("failed to allcoate from svp_idspace: %d", + errno); + sqp->sq_header.svp_crc32 = htonl(0); + sqp->sq_rdata = buf; + sqp->sq_rsize = buflen; + sqp->sq_wdata = NULL; + sqp->sq_wsize = 0; + + /* + * We need to fix up the count to be in proper network order. + */ + svrr->svrr_count = htonl(svrr->svrr_count); + + /* + * If this fails, there isn't much that we can't do. Give the callback + * with a fatal status. + */ + mutex_enter(&srp->sr_lock); + queued = svp_remote_conn_queue(srp, sqp); + mutex_exit(&srp->sr_lock); + + if (queued == B_FALSE) + svp_shootdown_logr_cb(srp, SVP_S_FATAL, NULL, 0); +} + +/* ARGSUSED */ +void +svp_remote_dns_timer(void *unused) +{ + svp_remote_t *s; + mutex_enter(&svp_remote_lock); + for (s = avl_first(&svp_remote_tree); s != NULL; + s = AVL_NEXT(&svp_remote_tree, s)) { + svp_host_queue(s); + } + mutex_exit(&svp_remote_lock); +} + +void +svp_remote_resolved(svp_remote_t *srp, struct addrinfo *newaddrs) +{ + struct addrinfo *a; + svp_conn_t *scp; + int ngen; + + mutex_enter(&srp->sr_lock); + srp->sr_gen++; + ngen = srp->sr_gen; + mutex_exit(&srp->sr_lock); + + for (a = newaddrs; a != NULL; a = a->ai_next) { + struct in6_addr in6; + struct in6_addr *addrp; + + if (a->ai_family != AF_INET && a->ai_family != AF_INET6) + continue; + + if (a->ai_family == AF_INET) { + struct sockaddr_in *v4; + v4 = (struct sockaddr_in *)a->ai_addr; + addrp = &in6; + IN6_INADDR_TO_V4MAPPED(&v4->sin_addr, addrp); + } else { + struct sockaddr_in6 *v6; + v6 = (struct sockaddr_in6 *)a->ai_addr; + addrp = &v6->sin6_addr; + } + + mutex_enter(&srp->sr_lock); + for (scp = list_head(&srp->sr_conns); scp != NULL; + scp = list_next(&srp->sr_conns, scp)) { + mutex_enter(&scp->sc_lock); + if (bcmp(addrp, &scp->sc_addr, + sizeof (struct in6_addr)) == 0) { + scp->sc_gen = ngen; + mutex_exit(&scp->sc_lock); + break; + } + mutex_exit(&scp->sc_lock); + } + + /* + * We need to be careful in the assumptions that we make here, + * as there's a good chance that svp_conn_create will + * drop the svp_remote_t`sr_lock to kick off its effective event + * loop. + */ + if (scp == NULL) + (void) svp_conn_create(srp, addrp); + mutex_exit(&srp->sr_lock); + } + + /* + * Now it's time to clean things up. We do not actively clean up the + * current connections that we have, instead allowing them to stay + * around assuming that they're still useful. Instead, we go through and + * purge the degraded list for anything that's from an older generation. + */ + mutex_enter(&srp->sr_lock); + for (scp = list_head(&srp->sr_conns); scp != NULL; + scp = list_next(&srp->sr_conns, scp)) { + boolean_t fall = B_FALSE; + mutex_enter(&scp->sc_lock); + if (scp->sc_gen < srp->sr_gen) + fall = B_TRUE; + mutex_exit(&scp->sc_lock); + if (fall == B_TRUE) + svp_conn_fallout(scp); + } + mutex_exit(&srp->sr_lock); +} + +/* + * This connection is in the process of being reset, we need to reassign all of + * its queries to other places or mark them as fatal. Note that the first + * connection was the one in flight when this failed. We always mark it as + * failed to avoid trying to reset its state. + */ +void +svp_remote_reassign(svp_remote_t *srp, svp_conn_t *scp) +{ + boolean_t first = B_TRUE; + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&srp->sr_lock)); + svp_query_t *sqp; + + /* + * As we try to reassigning all of its queries, remove it from the list. + */ + list_remove(&srp->sr_conns, scp); + + while ((sqp = list_remove_head(&scp->sc_queries)) != NULL) { + + if (first == B_TRUE) { + sqp->sq_status = SVP_S_FATAL; + sqp->sq_func(sqp, sqp->sq_arg); + continue; + } + + sqp->sq_acttime = -1; + + /* + * We may want to maintain a queue of these for some time rather + * than just failing them all. + */ + if (svp_remote_conn_queue(srp, sqp) == B_FALSE) { + sqp->sq_status = SVP_S_FATAL; + sqp->sq_func(sqp, sqp->sq_arg); + } + } + + /* + * Now that we're done, go ahead and re-insert. + */ + list_insert_tail(&srp->sr_conns, scp); +} + +void +svp_remote_degrade(svp_remote_t *srp, svp_degrade_state_t flag) +{ + int sf, nf; + char buf[256]; + + assert(MUTEX_HELD(&srp->sr_lock)); + + if (flag == SVP_RD_ALL || flag == 0) + libvarpd_panic("invalid flag passed to degrade"); + + if ((flag & srp->sr_degrade) != 0) { + return; + } + + sf = ffs(srp->sr_degrade); + nf = ffs(flag); + srp->sr_degrade |= flag; + if (sf == 0 || sf > nf) { + svp_t *svp; + svp_remote_mkfmamsg(srp, flag, buf, sizeof (buf)); + + for (svp = avl_first(&srp->sr_tree); svp != NULL; + svp = AVL_NEXT(&srp->sr_tree, svp)) { + libvarpd_fma_degrade(svp->svp_hdl, buf); + } + } +} + +void +svp_remote_restore(svp_remote_t *srp, svp_degrade_state_t flag) +{ + int sf, nf; + + assert(MUTEX_HELD(&srp->sr_lock)); + sf = ffs(srp->sr_degrade); + if ((srp->sr_degrade & flag) != flag) + return; + srp->sr_degrade &= ~flag; + nf = ffs(srp->sr_degrade); + + /* + * If we're now empty, restore the device. If we still are degraded, but + * we now have a higher base than we used to, change the message. + */ + if (srp->sr_degrade == 0) { + svp_t *svp; + for (svp = avl_first(&srp->sr_tree); svp != NULL; + svp = AVL_NEXT(&srp->sr_tree, svp)) { + libvarpd_fma_restore(svp->svp_hdl); + } + } else if (nf != sf) { + svp_t *svp; + char buf[256]; + + svp_remote_mkfmamsg(srp, 1U << (nf - 1), buf, sizeof (buf)); + for (svp = avl_first(&srp->sr_tree); svp != NULL; + svp = AVL_NEXT(&srp->sr_tree, svp)) { + libvarpd_fma_degrade(svp->svp_hdl, buf); + } + } +} + +void +svp_remote_shootdown_vl3_cb(svp_query_t *sqp, void *arg) +{ + svp_shoot_vl3_t *squery = arg; + svp_log_vl3_t *svl3 = squery->ssv_vl3; + svp_sdlog_t *sdl = squery->ssv_log; + + if (sqp->sq_status == SVP_S_OK) { + svp_t *svp, lookup; + + svp_remote_t *srp = sdl->sdl_remote; + svp_vl3_ack_t *vl3a = (svp_vl3_ack_t *)sqp->sq_wdata; + + lookup.svp_vid = ntohl(svl3->svl3_vnetid); + mutex_enter(&srp->sr_lock); + if ((svp = avl_find(&srp->sr_tree, &lookup, NULL)) != NULL) { + svp->svp_cb.scb_vl3_inject(svp, ntohs(svl3->svl3_vlan), + (struct in6_addr *)svl3->svl3_ip, vl3a->sl3a_mac, + NULL); + } + mutex_exit(&srp->sr_lock); + + } + + svp_shootdown_vl3_cb(sqp->sq_status, svl3, sdl); + + umem_free(squery, sizeof (svp_shoot_vl3_t)); +} + +void +svp_remote_shootdown_vl3(svp_remote_t *srp, svp_log_vl3_t *svl3, + svp_sdlog_t *sdl) +{ + svp_shoot_vl3_t *squery; + + squery = umem_zalloc(sizeof (svp_shoot_vl3_t), UMEM_DEFAULT); + if (squery == NULL) { + svp_shootdown_vl3_cb(SVP_S_FATAL, svl3, sdl); + return; + } + + squery->ssv_vl3 = svl3; + squery->ssv_log = sdl; + squery->ssv_sock.sin6_family = AF_INET6; + bcopy(svl3->svl3_ip, &squery->ssv_sock.sin6_addr, + sizeof (svl3->svl3_ip)); + svp_remote_vl3_logreq(srp, &squery->ssv_query, ntohl(svl3->svl3_vnetid), + (struct sockaddr *)&squery->ssv_sock, svp_remote_shootdown_vl3_cb, + squery); +} + +void +svp_remote_shootdown_vl2(svp_remote_t *srp, svp_log_vl2_t *svl2) +{ + svp_t *svp, lookup; + + lookup.svp_vid = ntohl(svl2->svl2_vnetid); + mutex_enter(&srp->sr_lock); + if ((svp = avl_find(&srp->sr_tree, &lookup, NULL)) != NULL) { + svp->svp_cb.scb_vl2_invalidate(svp, svl2->svl2_mac); + } + mutex_exit(&srp->sr_lock); +} + +int +svp_remote_init(void) +{ + svp_idspace = id_space_create("svp_req_ids", 1, INT32_MAX); + if (svp_idspace == NULL) + return (errno); + avl_create(&svp_remote_tree, svp_remote_comparator, + sizeof (svp_remote_t), offsetof(svp_remote_t, sr_gnode)); + svp_dns_timer.st_func = svp_remote_dns_timer; + svp_dns_timer.st_arg = NULL; + svp_dns_timer.st_oneshot = B_FALSE; + svp_dns_timer.st_value = svp_dns_timer_rate; + svp_timer_add(&svp_dns_timer); + return (0); +} + +void +svp_remote_fini(void) +{ + svp_timer_remove(&svp_dns_timer); + avl_destroy(&svp_remote_tree); + if (svp_idspace == NULL) + id_space_destroy(svp_idspace); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_shootdown.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_shootdown.c new file mode 100644 index 0000000000..76afb2519f --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_shootdown.c @@ -0,0 +1,474 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * Shootdown processing logic. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +#include <umem.h> +#include <sys/uuid.h> +#include <assert.h> +#include <strings.h> +#include <errno.h> +#include <sys/debug.h> + +#include <libvarpd_provider.h> +#include <libvarpd_svp.h> + +/* + * When we've determined that there's nothing left for us to do, then we go + * ahead and wait svp_shootdown_base seconds + up to an additional + * svp_shootdown_base seconds before asking again. However, if there is actually + * some work going on, just use the svp_shootdown_cont time. + */ +static int svp_shootdown_base = 5; +static int svp_shootdown_cont = 1; + +/* + * These are sizes for our logack and logrm buffers. The sizing of the shootdown + * buffere would give us approximately 18 or so VL3 entries and 32 VL2 entries + * or some combination thereof. While it's a bit of overkill, we just use the + * same sized buffer for the list of uuids that we pass to remove log entries + * that we've acted upon. + */ +static int svp_shootdown_buf = 1024; + +static void +svp_shootdown_schedule(svp_sdlog_t *sdl, boolean_t cont) +{ + assert(MUTEX_HELD(&sdl->sdl_lock)); + + if (cont == B_TRUE) { + sdl->sdl_timer.st_value = svp_shootdown_cont; + } else { + sdl->sdl_timer.st_value = svp_shootdown_base + + arc4random_uniform(svp_shootdown_base + 1); + } + svp_timer_add(&sdl->sdl_timer); +} + +void +svp_shootdown_lrm_cb(svp_remote_t *srp, svp_status_t status) +{ + svp_sdlog_t *sdl = &srp->sr_shoot; + + mutex_enter(&sdl->sdl_lock); + sdl->sdl_flags &= ~SVP_SD_RUNNING; + svp_shootdown_schedule(sdl, B_TRUE); + mutex_exit(&sdl->sdl_lock); + + if (status != SVP_S_OK) { + (void) bunyan_warn(svp_bunyan, "SVP_R_LOG_RM failed", + BUNYAN_T_STRING, "remote_host", srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", srp->sr_rport, + BUNYAN_T_INT32, "status", status, + BUNYAN_T_END); + } +} + +static void +svp_shootdown_ref(svp_sdlog_t *sdl) +{ + mutex_enter(&sdl->sdl_lock); + sdl->sdl_ref++; + mutex_exit(&sdl->sdl_lock); +} + +static void +svp_shootdown_rele(svp_sdlog_t *sdl) +{ + svp_lrm_req_t *svrr = sdl->sdl_logrm; + boolean_t next; + + mutex_enter(&sdl->sdl_lock); + VERIFY(sdl->sdl_ref > 0); + sdl->sdl_ref--; + if (sdl->sdl_ref > 0) { + mutex_exit(&sdl->sdl_lock); + return; + } + + /* + * At this point we know that we hold the last reference, therefore it's + * safe for us to go ahead and clean up and move on and attempt to + * deliver the reply. We always deliver the reply by going through the + * timer. This can be rather important as the final reference may be + * coming through a failed query and it's not always safe for us to + * callback into the remote routines from this context. + * + * We should only do this if we have a non-zero number of entries to + * take down. + */ + sdl->sdl_flags &= ~SVP_SD_RUNNING; + if (svrr->svrr_count > 0) { + sdl->sdl_flags |= SVP_SD_DORM; + next = B_TRUE; + } else { + next = B_FALSE; + } + svp_shootdown_schedule(sdl, next); + mutex_exit(&sdl->sdl_lock); +} + +/* + * This is a callback used to indicate that the VL3 lookup has completed and an + * entry, if any, has been injected. If the command succeeded, eg. we got that + * the status was OK or that it was not found, then we will add it to he list to + * shoot down. Otherwise, there's nothing else for us to really do here. + */ +void +svp_shootdown_vl3_cb(svp_status_t status, svp_log_vl3_t *vl3, svp_sdlog_t *sdl) +{ + svp_lrm_req_t *svrr = sdl->sdl_logrm; + + mutex_enter(&sdl->sdl_lock); + if (status == SVP_S_OK || status == SVP_S_NOTFOUND) { + bcopy(vl3->svl3_id, &svrr->svrr_ids[svrr->svrr_count * 16], + UUID_LEN); + svrr->svrr_count++; + } + mutex_exit(&sdl->sdl_lock); + + svp_shootdown_rele(sdl); +} + +static int +svp_shootdown_logr_shoot(void *data, svp_log_type_t type, void *arg) +{ + svp_sdlog_t *sdl = arg; + svp_remote_t *srp = sdl->sdl_remote; + svp_lrm_req_t *svrr = sdl->sdl_logrm; + + if (type != SVP_LOG_VL2 && type != SVP_LOG_VL3) + libvarpd_panic("encountered unknown type: %d\n", type); + + if (type == SVP_LOG_VL2) { + svp_log_vl2_t *svl2 = data; + svp_remote_shootdown_vl2(srp, svl2); + mutex_enter(&sdl->sdl_lock); + bcopy(svl2->svl2_id, &svrr->svrr_ids[svrr->svrr_count * 16], + UUID_LEN); + svrr->svrr_count++; + mutex_exit(&sdl->sdl_lock); + } else { + svp_log_vl3_t *svl3 = data; + + /* Take a hold for the duration of this request */ + svp_shootdown_ref(sdl); + svp_remote_shootdown_vl3(srp, svl3, sdl); + } + + return (0); +} + +static int +svp_shootdown_logr_count(void *data, svp_log_type_t type, void *arg) +{ + uint_t *u = arg; + *u = *u + 1; + return (0); +} + + +static int +svp_shootdown_logr_iter(svp_remote_t *srp, void *buf, size_t len, + int (*cb)(void *, svp_log_type_t, void *), void *arg) +{ + int ret; + off_t cboff = 0; + uint32_t *typep, type; + svp_log_vl2_t *svl2; + svp_log_vl3_t *svl3; + + /* Adjust for initial status word */ + assert(len >= sizeof (uint32_t)); + len -= sizeof (uint32_t); + cboff += sizeof (uint32_t); + + while (len > 0) { + size_t opsz; + + if (len < sizeof (uint32_t)) { + (void) bunyan_warn(svp_bunyan, + "failed to get initial shootdown tag", + BUNYAN_T_STRING, "remote_host", srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", srp->sr_rport, + BUNYAN_T_INT32, "response_size", cboff + len, + BUNYAN_T_INT32, "response_offset", cboff, + BUNYAN_T_END); + return (-1); + } + + typep = buf + cboff; + type = ntohl(*typep); + if (type == SVP_LOG_VL2) { + opsz = sizeof (svp_log_vl2_t); + if (len < opsz) { + (void) bunyan_warn(svp_bunyan, + "not enough data for svp_log_vl2_t", + BUNYAN_T_STRING, "remote_host", + srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", + srp->sr_rport, + BUNYAN_T_INT32, "response_size", + cboff + len, + BUNYAN_T_INT32, "response_offset", cboff, + BUNYAN_T_END); + return (-1); + } + svl2 = (void *)typep; + if ((ret = cb(svl2, type, arg)) != 0) + return (ret); + } else if (type == SVP_LOG_VL3) { + + opsz = sizeof (svp_log_vl3_t); + if (len < opsz) { + (void) bunyan_warn(svp_bunyan, + "not enough data for svp_log_vl3_t", + BUNYAN_T_STRING, "remote_host", + srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", + srp->sr_rport, + BUNYAN_T_INT32, "response_size", + cboff + len, + BUNYAN_T_INT32, "response_offset", cboff, + BUNYAN_T_END); + return (-1); + } + svl3 = (void *)typep; + if ((ret = cb(svl3, type, arg)) != 0) + return (ret); + } else { + (void) bunyan_warn(svp_bunyan, + "unknown log structure type", + BUNYAN_T_STRING, "remote_host", + srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", srp->sr_rport, + BUNYAN_T_INT32, "response_size", cboff + len, + BUNYAN_T_INT32, "response_offset", cboff, + BUNYAN_T_INT32, "structure_type", type, + BUNYAN_T_END); + return (-1); + } + len -= opsz; + cboff += opsz; + } + + return (0); +} + +void +svp_shootdown_logr_cb(svp_remote_t *srp, svp_status_t status, void *cbdata, + size_t cbsize) +{ + uint_t count; + svp_sdlog_t *sdl = &srp->sr_shoot; + + if (status != SVP_S_OK) { + (void) bunyan_warn(svp_bunyan, + "log request not OK", + BUNYAN_T_STRING, "remote_host", srp->sr_hostname, + BUNYAN_T_INT32, "remote_port", srp->sr_rport, + BUNYAN_T_INT32, "response_size", cbsize, + BUNYAN_T_INT32, "status", status, + BUNYAN_T_END); + mutex_enter(&sdl->sdl_lock); + sdl->sdl_flags &= ~SVP_SD_RUNNING; + svp_shootdown_schedule(sdl, B_FALSE); + mutex_exit(&sdl->sdl_lock); + return; + } + + /* + * First go ahead and count the number of entries. This effectively + * allows us to validate that all the data is valid, if this fails, then + * we fail the request. + */ + count = 0; + if ((svp_shootdown_logr_iter(srp, cbdata, cbsize, + svp_shootdown_logr_count, &count)) != 0) { + mutex_enter(&sdl->sdl_lock); + sdl->sdl_flags &= ~SVP_SD_RUNNING; + svp_shootdown_schedule(sdl, B_FALSE); + mutex_exit(&sdl->sdl_lock); + return; + } + + /* + * If we have no entries, then we're also done. + */ + if (count == 0) { + mutex_enter(&sdl->sdl_lock); + sdl->sdl_flags &= ~SVP_SD_RUNNING; + svp_shootdown_schedule(sdl, B_FALSE); + mutex_exit(&sdl->sdl_lock); + return; + } + + /* + * We have work to do. Because we may have asynchronous VL3 tasks, we're + * going to first grab a reference before we do the iteration. Then, for + * each asynchronous VL3 request we make, that'll also grab a hold. Once + * we're done with the iteration, we'll drop our hold. If that's the + * last one, it'll move on accordingly. + */ + svp_shootdown_ref(sdl); + bzero(sdl->sdl_logrm, svp_shootdown_buf); + + /* + * If this fails, we're going to determine what to do next based on the + * number of entries that were entered into the log removal. At this + * point success or failure don't really look different, all it changes + * is how many entries we have to remove. + */ + (void) svp_shootdown_logr_iter(srp, cbdata, cbsize, + svp_shootdown_logr_shoot, sdl); + + /* + * Now that we're done with our work, release the hold. If we don't have + * any vl3 tasks outstanding, this'll trigger the next phase of the log + * removals. + */ + svp_shootdown_rele(sdl); +} + +static void +svp_shootdown_timer(void *arg) +{ + svp_sdlog_t *sdl = arg; + svp_remote_t *srp = sdl->sdl_remote; + boolean_t init = B_TRUE; + + mutex_enter(&sdl->sdl_lock); + + /* + * If we've been asked to quiesce, we're done. + */ + if ((sdl->sdl_flags & SVP_SD_QUIESCE) != 0) { + mutex_exit(&sdl->sdl_lock); + return; + } + + /* + * We shouldn't be able to have ourselves currently be running and reach + * here. If that's the case, we should immediately panic. + */ + if ((sdl->sdl_flags & SVP_SD_RUNNING) != 0) { + libvarpd_panic("remote %p shootdown timer fired while still " + "running", srp); + } + + if ((sdl->sdl_flags & SVP_SD_DORM) != 0) { + sdl->sdl_flags &= ~SVP_SD_DORM; + init = B_FALSE; + } + + sdl->sdl_flags |= SVP_SD_RUNNING; + mutex_exit(&sdl->sdl_lock); + + if (init == B_FALSE) { + svp_lrm_req_t *svrr = sdl->sdl_logrm; + + bzero(&sdl->sdl_query, sizeof (svp_query_t)); + svp_remote_lrm_request(sdl->sdl_remote, &sdl->sdl_query, svrr, + sizeof (*svrr) + 16 * svrr->svrr_count); + } else { + bzero(&sdl->sdl_query, sizeof (svp_query_t)); + svp_remote_log_request(srp, &sdl->sdl_query, sdl->sdl_logack, + svp_shootdown_buf); + } +} + +void +svp_shootdown_fini(svp_remote_t *srp) +{ + svp_sdlog_t *sdl = &srp->sr_shoot; + + mutex_enter(&sdl->sdl_lock); + sdl->sdl_flags |= SVP_SD_QUIESCE; + mutex_exit(&sdl->sdl_lock); + + svp_timer_remove(&sdl->sdl_timer); + + mutex_enter(&sdl->sdl_lock); + + /* + * Normally svp_timer_remove would be enough. However, the query could + * have been put out again outside of the svp_timer interface. Therefore + * we still need to check for SVP_SD_RUNNING. + */ + while (sdl->sdl_flags & SVP_SD_RUNNING) + (void) cond_wait(&sdl->sdl_cond, &sdl->sdl_lock); + mutex_exit(&sdl->sdl_lock); + + umem_free(sdl->sdl_logack, svp_shootdown_buf); + umem_free(sdl->sdl_logrm, svp_shootdown_buf); + sdl->sdl_logack = NULL; + sdl->sdl_logrm = NULL; + (void) cond_destroy(&sdl->sdl_cond); + (void) mutex_destroy(&sdl->sdl_lock); +} + +void +svp_shootdown_start(svp_remote_t *srp) +{ + svp_sdlog_t *sdl = &srp->sr_shoot; + + mutex_enter(&sdl->sdl_lock); + svp_shootdown_schedule(sdl, B_FALSE); + mutex_exit(&sdl->sdl_lock); +} + +int +svp_shootdown_init(svp_remote_t *srp) +{ + int ret; + svp_sdlog_t *sdl = &srp->sr_shoot; + if ((ret = mutex_init(&sdl->sdl_lock, USYNC_THREAD | LOCK_ERRORCHECK, + NULL)) != 0) + return (ret); + + if ((ret = cond_init(&sdl->sdl_cond, USYNC_THREAD, NULL)) != 0) { + (void) mutex_destroy(&sdl->sdl_lock); + return (ret); + } + + if ((sdl->sdl_logack = umem_alloc(svp_shootdown_buf, UMEM_DEFAULT)) == + NULL) { + ret = errno; + (void) cond_destroy(&sdl->sdl_cond); + (void) mutex_destroy(&sdl->sdl_lock); + return (ret); + } + + if ((sdl->sdl_logrm = umem_alloc(svp_shootdown_buf, UMEM_DEFAULT)) == + NULL) { + ret = errno; + umem_free(sdl->sdl_logack, svp_shootdown_buf); + (void) cond_destroy(&sdl->sdl_cond); + (void) mutex_destroy(&sdl->sdl_lock); + return (ret); + } + + sdl->sdl_remote = srp; + sdl->sdl_timer.st_oneshot = B_TRUE; + sdl->sdl_timer.st_func = svp_shootdown_timer; + sdl->sdl_timer.st_arg = sdl; + + return (0); +} diff --git a/usr/src/lib/varpd/svp/common/libvarpd_svp_timer.c b/usr/src/lib/varpd/svp/common/libvarpd_svp_timer.c new file mode 100644 index 0000000000..10b02748f3 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/libvarpd_svp_timer.c @@ -0,0 +1,150 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015, Joyent, Inc. + */ + +#include <stddef.h> +#include <libvarpd_svp.h> + +/* + * svp timer backend + * + * This implements all of the logic of maintaining a timer for the svp backend. + * We have a timer that fires at a one second tick. We maintain all of our + * events in avl tree, sorted by the tick that they need to be processed at. + * + * For more information, see the big theory statement in + * lib/varpd/svp/common/libvarpd_svp.c. + */ + +int svp_tickrate = 1; +static svp_event_t svp_timer_event; +static mutex_t svp_timer_lock = ERRORCHECKMUTEX; +static cond_t svp_timer_cv = DEFAULTCV; +static avl_tree_t svp_timer_tree; +static uint64_t svp_timer_nticks; + +static int +svp_timer_comparator(const void *l, const void *r) +{ + const svp_timer_t *lt, *rt; + + lt = l; + rt = r; + + if (lt->st_expire > rt->st_expire) + return (1); + else if (lt->st_expire < rt->st_expire) + return (-1); + + /* + * Multiple timers can have the same delivery time, so sort within that + * by the address of the timer itself. + */ + if ((uintptr_t)lt > (uintptr_t)rt) + return (1); + else if ((uintptr_t)lt < (uintptr_t)rt) + return (-1); + + return (0); +} + +/* ARGSUSED */ +static void +svp_timer_tick(port_event_t *pe, void *arg) +{ + mutex_enter(&svp_timer_lock); + svp_timer_nticks++; + + for (;;) { + svp_timer_t *t; + + t = avl_first(&svp_timer_tree); + if (t == NULL || t->st_expire > svp_timer_nticks) + break; + + avl_remove(&svp_timer_tree, t); + + /* + * We drop this while performing an operation so that way state + * can advance in the face of a long-running callback. + */ + t->st_delivering = B_TRUE; + mutex_exit(&svp_timer_lock); + t->st_func(t->st_arg); + mutex_enter(&svp_timer_lock); + t->st_delivering = B_FALSE; + (void) cond_broadcast(&svp_timer_cv); + if (t->st_oneshot == B_FALSE) { + t->st_expire += t->st_value; + avl_add(&svp_timer_tree, t); + } + } + mutex_exit(&svp_timer_lock); +} + +void +svp_timer_add(svp_timer_t *stp) +{ + if (stp->st_value == 0) + libvarpd_panic("tried to add svp timer with zero value"); + + mutex_enter(&svp_timer_lock); + stp->st_delivering = B_FALSE; + stp->st_expire = svp_timer_nticks + stp->st_value; + avl_add(&svp_timer_tree, stp); + mutex_exit(&svp_timer_lock); +} + +void +svp_timer_remove(svp_timer_t *stp) +{ + mutex_enter(&svp_timer_lock); + + /* + * If the event in question is not currently being delivered, then we + * can stop it before it next fires. If it is currently being delivered, + * we need to wait for that to finish. Because we hold the timer lock, + * we know that it cannot be rearmed. Therefore, we make sure the one + * shot is set to zero, and wait until it's no longer set to delivering. + */ + if (stp->st_delivering == B_FALSE) { + avl_remove(&svp_timer_tree, stp); + mutex_exit(&svp_timer_lock); + return; + } + + stp->st_oneshot = B_TRUE; + while (stp->st_delivering == B_TRUE) + (void) cond_wait(&svp_timer_cv, &svp_timer_lock); + + mutex_exit(&svp_timer_lock); +} + +int +svp_timer_init(void) +{ + int ret; + + svp_timer_event.se_func = svp_timer_tick; + svp_timer_event.se_arg = NULL; + + avl_create(&svp_timer_tree, svp_timer_comparator, sizeof (svp_timer_t), + offsetof(svp_timer_t, st_link)); + + if ((ret = svp_event_timer_init(&svp_timer_event)) != 0) { + avl_destroy(&svp_timer_tree); + } + + return (ret); +} diff --git a/usr/src/lib/varpd/svp/common/llib-lvarpd_svp b/usr/src/lib/varpd/svp/common/llib-lvarpd_svp new file mode 100644 index 0000000000..03c34f4fcb --- /dev/null +++ b/usr/src/lib/varpd/svp/common/llib-lvarpd_svp @@ -0,0 +1,18 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* LINTLIBRARY */ +/* PROTOLIB1 */ + diff --git a/usr/src/lib/varpd/svp/common/mapfile-vers b/usr/src/lib/varpd/svp/common/mapfile-vers new file mode 100644 index 0000000000..6b7c5a5067 --- /dev/null +++ b/usr/src/lib/varpd/svp/common/mapfile-vers @@ -0,0 +1,35 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +# +# MAPFILE HEADER START +# +# WARNING: STOP NOW. DO NOT MODIFY THIS FILE. +# Object versioning must comply with the rules detailed in +# +# usr/src/lib/README.mapfiles +# +# You should not be making modifications here until you've read the most current +# copy of that file. If you need help, contact a gatekeeper for guidance. +# +# MAPFILE HEADER END +# + +$mapfile_version 2 + +SYMBOL_VERSION SUNWprivate { + local: + *; +}; diff --git a/usr/src/lib/varpd/svp/i386/Makefile b/usr/src/lib/varpd/svp/i386/Makefile new file mode 100644 index 0000000000..f2b4f63da5 --- /dev/null +++ b/usr/src/lib/varpd/svp/i386/Makefile @@ -0,0 +1,18 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com + +install: all $(ROOTLIBS) $(ROOTLINKS) $(ROOTLINT) diff --git a/usr/src/lib/varpd/svp/sparc/Makefile b/usr/src/lib/varpd/svp/sparc/Makefile new file mode 100644 index 0000000000..f2b4f63da5 --- /dev/null +++ b/usr/src/lib/varpd/svp/sparc/Makefile @@ -0,0 +1,18 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com + +install: all $(ROOTLIBS) $(ROOTLINKS) $(ROOTLINT) diff --git a/usr/src/lib/varpd/svp/sparcv9/Makefile b/usr/src/lib/varpd/svp/sparcv9/Makefile new file mode 100644 index 0000000000..d552642882 --- /dev/null +++ b/usr/src/lib/varpd/svp/sparcv9/Makefile @@ -0,0 +1,19 @@ +# +# This file and its contents are supplied under the terms of the +# Common Development and Distribution License ("CDDL"), version 1.0. +# You may only use this file in accordance with the terms of version +# 1.0 of the CDDL. +# +# A full copy of the text of the CDDL should have accompanied this +# source. A copy of the CDDL is also available via the Internet at +# http://www.illumos.org/license/CDDL. +# + +# +# Copyright 2015 Joyent, Inc. +# + +include ../Makefile.com +include ../../../Makefile.lib.64 + +install: all $(ROOTLIBS64) $(ROOTLINKS64) $(ROOTLINT64) |