diff options
author | Michael Biebl <biebl@debian.org> | 2014-04-03 03:12:18 +0200 |
---|---|---|
committer | Michael Biebl <biebl@debian.org> | 2014-04-03 03:12:18 +0200 |
commit | 86c2cb438891cc0cb621b231efac2aa1f3c780df (patch) | |
tree | 1c2550bbabfdbece7c9350ffe260bc053458e41f /runtime | |
parent | 362a79f22d4bc0e8c3f1d2fe1c57915a1e47ba7c (diff) | |
parent | 9374a46543e9c43c009f80def8c3b2506b0b377e (diff) | |
download | rsyslog-86c2cb438891cc0cb621b231efac2aa1f3c780df.tar.gz |
Merge tag 'upstream/8.2.0' into experimental
Upstream version 8.2.0
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/Makefile.in | 254 | ||||
-rw-r--r-- | runtime/batch.h | 62 | ||||
-rw-r--r-- | runtime/conf.c | 6 | ||||
-rw-r--r-- | runtime/glbl.c | 75 | ||||
-rw-r--r-- | runtime/glbl.h | 12 | ||||
-rw-r--r-- | runtime/lookup.c | 5 | ||||
-rw-r--r-- | runtime/module-template.h | 104 | ||||
-rw-r--r-- | runtime/modules.c | 53 | ||||
-rw-r--r-- | runtime/modules.h | 8 | ||||
-rw-r--r-- | runtime/msg.c | 48 | ||||
-rw-r--r-- | runtime/parser.c | 126 | ||||
-rw-r--r-- | runtime/parser.h | 1 | ||||
-rw-r--r-- | runtime/queue.c | 93 | ||||
-rw-r--r-- | runtime/queue.h | 9 | ||||
-rw-r--r-- | runtime/rsconf.c | 10 | ||||
-rw-r--r-- | runtime/rsyslog.c | 5 | ||||
-rw-r--r-- | runtime/rsyslog.h | 35 | ||||
-rw-r--r-- | runtime/ruleset.c | 382 | ||||
-rw-r--r-- | runtime/ruleset.h | 6 | ||||
-rw-r--r-- | runtime/srUtils.h | 2 | ||||
-rw-r--r-- | runtime/srutils.c | 2 | ||||
-rw-r--r-- | runtime/stream.c | 8 | ||||
-rw-r--r-- | runtime/stream.h | 2 | ||||
-rw-r--r-- | runtime/typedefs.h | 3 | ||||
-rw-r--r-- | runtime/unicode-helper.h | 14 | ||||
-rw-r--r-- | runtime/wti.c | 121 | ||||
-rw-r--r-- | runtime/wti.h | 174 | ||||
-rw-r--r-- | runtime/wtp.c | 6 |
28 files changed, 1034 insertions, 592 deletions
diff --git a/runtime/Makefile.in b/runtime/Makefile.in index 80715fe..4223dc4 100644 --- a/runtime/Makefile.in +++ b/runtime/Makefile.in @@ -1,9 +1,8 @@ -# Makefile.in generated by automake 1.11.3 from Makefile.am. +# Makefile.in generated by automake 1.13.4 from Makefile.am. # @configure_input@ -# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, -# 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011 Free Software -# Foundation, Inc. +# Copyright (C) 1994-2013 Free Software Foundation, Inc. + # This Makefile.in is free software; the Free Software Foundation # gives unlimited permission to copy and/or distribute it, # with or without modifications, as long as this notice is preserved. @@ -17,6 +16,51 @@ VPATH = @srcdir@ +am__is_gnu_make = test -n '$(MAKEFILE_LIST)' && test -n '$(MAKELEVEL)' +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) pkgdatadir = $(datadir)/@PACKAGE@ pkgincludedir = $(includedir)/@PACKAGE@ pkglibdir = $(libdir)/@PACKAGE@ @@ -72,7 +116,8 @@ sbin_PROGRAMS = @ENABLE_GUARDTIME_TRUE@am__append_7 = librsgt.la @ENABLE_GUARDTIME_TRUE@am__append_8 = lmsig_gt.la subdir = runtime -DIST_COMMON = $(srcdir)/Makefile.am $(srcdir)/Makefile.in +DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am \ + $(top_srcdir)/depcomp ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 am__aclocal_m4_deps = $(top_srcdir)/m4/atomic_operations.m4 \ $(top_srcdir)/m4/atomic_operations_64bit.m4 \ @@ -122,6 +167,7 @@ libgcry_la_OBJECTS = $(am_libgcry_la_OBJECTS) AM_V_lt = $(am__v_lt_@AM_V@) am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) am__v_lt_0 = --silent +am__v_lt_1 = @ENABLE_LIBGCRYPT_TRUE@am_libgcry_la_rpath = librsgt_la_LIBADD = am__librsgt_la_SOURCES_DIST = librsgt.c librsgt_read.c librsgt.h @@ -245,6 +291,18 @@ lmzlibw_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ $(lmzlibw_la_LDFLAGS) $(LDFLAGS) -o $@ @ENABLE_ZLIB_TRUE@am_lmzlibw_la_rpath = -rpath $(pkglibdir) PROGRAMS = $(sbin_PROGRAMS) +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) depcomp = $(SHELL) $(top_srcdir)/depcomp am__depfiles_maybe = depfiles @@ -257,20 +315,16 @@ LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ $(AM_CFLAGS) $(CFLAGS) AM_V_CC = $(am__v_CC_@AM_V@) am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@) -am__v_CC_0 = @echo " CC " $@; -AM_V_at = $(am__v_at_@AM_V@) -am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) -am__v_at_0 = @ +am__v_CC_0 = @echo " CC " $@; +am__v_CC_1 = CCLD = $(CC) LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ $(AM_LDFLAGS) $(LDFLAGS) -o $@ AM_V_CCLD = $(am__v_CCLD_@AM_V@) am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@) -am__v_CCLD_0 = @echo " CCLD " $@; -AM_V_GEN = $(am__v_GEN_@AM_V@) -am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) -am__v_GEN_0 = @echo " GEN " $@; +am__v_CCLD_0 = @echo " CCLD " $@; +am__v_CCLD_1 = SOURCES = $(libgcry_la_SOURCES) $(librsgt_la_SOURCES) \ $(librsyslog_la_SOURCES) $(lmcry_gcry_la_SOURCES) \ $(lmnet_la_SOURCES) $(lmnetstrms_la_SOURCES) \ @@ -287,6 +341,28 @@ DIST_SOURCES = $(am__libgcry_la_SOURCES_DIST) \ $(am__lmsig_gt_la_SOURCES_DIST) \ $(am__lmstrmsrv_la_SOURCES_DIST) \ $(am__lmzlibw_la_SOURCES_DIST) +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +# Read a list of newline-separated strings from the standard input, +# and print each of them once, without duplicates. Input order is +# *not* preserved. +am__uniquify_input = $(AWK) '\ + BEGIN { nonempty = 0; } \ + { items[$$0] = 1; nonempty = 1; } \ + END { if (nonempty) { for (i in items) print i; }; } \ +' +# Make sure the list of sources is unique. This is necessary because, +# e.g., the same source file might be shared among _SOURCES variables +# for different programs/libraries. +am__define_uniq_tagged_files = \ + list='$(am__tagged_files)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | $(am__uniquify_input)` ETAGS = etags CTAGS = ctags DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) @@ -346,8 +422,6 @@ LEXLIB = @LEXLIB@ LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@ LIBDBI_CFLAGS = @LIBDBI_CFLAGS@ LIBDBI_LIBS = @LIBDBI_LIBS@ -LIBEE_CFLAGS = @LIBEE_CFLAGS@ -LIBEE_LIBS = @LIBEE_LIBS@ LIBESTR_CFLAGS = @LIBESTR_CFLAGS@ LIBESTR_LIBS = @LIBESTR_LIBS@ LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@ @@ -680,15 +754,17 @@ $(am__aclocal_m4_deps): clean-noinstLTLIBRARIES: -test -z "$(noinst_LTLIBRARIES)" || rm -f $(noinst_LTLIBRARIES) - @list='$(noinst_LTLIBRARIES)'; for p in $$list; do \ - dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \ - test "$$dir" != "$$p" || dir=.; \ - echo "rm -f \"$${dir}/so_locations\""; \ - rm -f "$${dir}/so_locations"; \ - done + @list='$(noinst_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } + install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) @$(NORMAL_INSTALL) - test -z "$(pkglibdir)" || $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ list2=; for p in $$list; do \ if test -f $$p; then \ @@ -696,6 +772,8 @@ install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) else :; fi; \ done; \ test -z "$$list2" || { \ + echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \ } @@ -711,14 +789,18 @@ uninstall-pkglibLTLIBRARIES: clean-pkglibLTLIBRARIES: -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES) - @list='$(pkglib_LTLIBRARIES)'; for p in $$list; do \ - dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \ - test "$$dir" != "$$p" || dir=.; \ - echo "rm -f \"$${dir}/so_locations\""; \ - rm -f "$${dir}/so_locations"; \ - done + @list='$(pkglib_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } + libgcry.la: $(libgcry_la_OBJECTS) $(libgcry_la_DEPENDENCIES) $(EXTRA_libgcry_la_DEPENDENCIES) $(AM_V_CCLD)$(LINK) $(am_libgcry_la_rpath) $(libgcry_la_OBJECTS) $(libgcry_la_LIBADD) $(LIBS) + librsgt.la: $(librsgt_la_OBJECTS) $(librsgt_la_DEPENDENCIES) $(EXTRA_librsgt_la_DEPENDENCIES) $(AM_V_CCLD)$(LINK) $(am_librsgt_la_rpath) $(librsgt_la_OBJECTS) $(librsgt_la_LIBADD) $(LIBS) ../$(am__dirstamp): @@ -737,36 +819,51 @@ librsgt.la: $(librsgt_la_OBJECTS) $(librsgt_la_DEPENDENCIES) $(EXTRA_librsgt_la_ ../$(DEPDIR)/$(am__dirstamp) ../librsyslog_la-template.lo: ../$(am__dirstamp) \ ../$(DEPDIR)/$(am__dirstamp) + librsyslog.la: $(librsyslog_la_OBJECTS) $(librsyslog_la_DEPENDENCIES) $(EXTRA_librsyslog_la_DEPENDENCIES) $(AM_V_CCLD)$(LINK) $(librsyslog_la_OBJECTS) $(librsyslog_la_LIBADD) $(LIBS) + lmcry_gcry.la: $(lmcry_gcry_la_OBJECTS) $(lmcry_gcry_la_DEPENDENCIES) $(EXTRA_lmcry_gcry_la_DEPENDENCIES) $(AM_V_CCLD)$(lmcry_gcry_la_LINK) $(am_lmcry_gcry_la_rpath) $(lmcry_gcry_la_OBJECTS) $(lmcry_gcry_la_LIBADD) $(LIBS) + lmnet.la: $(lmnet_la_OBJECTS) $(lmnet_la_DEPENDENCIES) $(EXTRA_lmnet_la_DEPENDENCIES) $(AM_V_CCLD)$(lmnet_la_LINK) $(am_lmnet_la_rpath) $(lmnet_la_OBJECTS) $(lmnet_la_LIBADD) $(LIBS) + lmnetstrms.la: $(lmnetstrms_la_OBJECTS) $(lmnetstrms_la_DEPENDENCIES) $(EXTRA_lmnetstrms_la_DEPENDENCIES) $(AM_V_CCLD)$(lmnetstrms_la_LINK) $(am_lmnetstrms_la_rpath) $(lmnetstrms_la_OBJECTS) $(lmnetstrms_la_LIBADD) $(LIBS) + lmnsd_gtls.la: $(lmnsd_gtls_la_OBJECTS) $(lmnsd_gtls_la_DEPENDENCIES) $(EXTRA_lmnsd_gtls_la_DEPENDENCIES) $(AM_V_CCLD)$(lmnsd_gtls_la_LINK) $(am_lmnsd_gtls_la_rpath) $(lmnsd_gtls_la_OBJECTS) $(lmnsd_gtls_la_LIBADD) $(LIBS) + lmnsd_ptcp.la: $(lmnsd_ptcp_la_OBJECTS) $(lmnsd_ptcp_la_DEPENDENCIES) $(EXTRA_lmnsd_ptcp_la_DEPENDENCIES) $(AM_V_CCLD)$(lmnsd_ptcp_la_LINK) $(am_lmnsd_ptcp_la_rpath) $(lmnsd_ptcp_la_OBJECTS) $(lmnsd_ptcp_la_LIBADD) $(LIBS) + lmregexp.la: $(lmregexp_la_OBJECTS) $(lmregexp_la_DEPENDENCIES) $(EXTRA_lmregexp_la_DEPENDENCIES) $(AM_V_CCLD)$(lmregexp_la_LINK) $(am_lmregexp_la_rpath) $(lmregexp_la_OBJECTS) $(lmregexp_la_LIBADD) $(LIBS) + lmsig_gt.la: $(lmsig_gt_la_OBJECTS) $(lmsig_gt_la_DEPENDENCIES) $(EXTRA_lmsig_gt_la_DEPENDENCIES) $(AM_V_CCLD)$(lmsig_gt_la_LINK) $(am_lmsig_gt_la_rpath) $(lmsig_gt_la_OBJECTS) $(lmsig_gt_la_LIBADD) $(LIBS) + lmstrmsrv.la: $(lmstrmsrv_la_OBJECTS) $(lmstrmsrv_la_DEPENDENCIES) $(EXTRA_lmstrmsrv_la_DEPENDENCIES) $(AM_V_CCLD)$(lmstrmsrv_la_LINK) $(am_lmstrmsrv_la_rpath) $(lmstrmsrv_la_OBJECTS) $(lmstrmsrv_la_LIBADD) $(LIBS) + lmzlibw.la: $(lmzlibw_la_OBJECTS) $(lmzlibw_la_DEPENDENCIES) $(EXTRA_lmzlibw_la_DEPENDENCIES) $(AM_V_CCLD)$(lmzlibw_la_LINK) $(am_lmzlibw_la_rpath) $(lmzlibw_la_OBJECTS) $(lmzlibw_la_LIBADD) $(LIBS) install-sbinPROGRAMS: $(sbin_PROGRAMS) @$(NORMAL_INSTALL) - test -z "$(sbindir)" || $(MKDIR_P) "$(DESTDIR)$(sbindir)" @list='$(sbin_PROGRAMS)'; test -n "$(sbindir)" || list=; \ + if test -n "$$list"; then \ + echo " $(MKDIR_P) '$(DESTDIR)$(sbindir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(sbindir)" || exit 1; \ + fi; \ for p in $$list; do echo "$$p $$p"; done | \ sed 's/$(EXEEXT)$$//' | \ - while read p p1; do if test -f $$p || test -f $$p1; \ - then echo "$$p"; echo "$$p"; else :; fi; \ + while read p p1; do if test -f $$p \ + || test -f $$p1 \ + ; then echo "$$p"; echo "$$p"; else :; fi; \ done | \ - sed -e 'p;s,.*/,,;n;h' -e 's|.*|.|' \ + sed -e 'p;s,.*/,,;n;h' \ + -e 's|.*|.|' \ -e 'p;x;s,.*/,,;s/$(EXEEXT)$$//;$(transform);s/$$/$(EXEEXT)/' | \ sed 'N;N;N;s,\n, ,g' | \ $(AWK) 'BEGIN { files["."] = ""; dirs["."] = 1 } \ @@ -787,7 +884,8 @@ uninstall-sbinPROGRAMS: @list='$(sbin_PROGRAMS)'; test -n "$(sbindir)" || list=; \ files=`for p in $$list; do echo "$$p"; done | \ sed -e 'h;s,^.*/,,;s/$(EXEEXT)$$//;$(transform)' \ - -e 's/$$/$(EXEEXT)/' `; \ + -e 's/$$/$(EXEEXT)/' \ + `; \ test -n "$$list" || exit 0; \ echo " ( cd '$(DESTDIR)$(sbindir)' && rm -f" $$files ")"; \ cd "$(DESTDIR)$(sbindir)" && rm -f $$files @@ -803,16 +901,8 @@ clean-sbinPROGRAMS: mostlyclean-compile: -rm -f *.$(OBJEXT) - -rm -f ../librsyslog_la-action.$(OBJEXT) - -rm -f ../librsyslog_la-action.lo - -rm -f ../librsyslog_la-outchannel.$(OBJEXT) - -rm -f ../librsyslog_la-outchannel.lo - -rm -f ../librsyslog_la-parse.$(OBJEXT) - -rm -f ../librsyslog_la-parse.lo - -rm -f ../librsyslog_la-template.$(OBJEXT) - -rm -f ../librsyslog_la-template.lo - -rm -f ../librsyslog_la-threads.$(OBJEXT) - -rm -f ../librsyslog_la-threads.lo + -rm -f ../*.$(OBJEXT) + -rm -f ../*.lo distclean-compile: -rm -f *.tab.c @@ -1283,26 +1373,15 @@ clean-libtool: -rm -rf .libs _libs -rm -rf ../.libs ../_libs -ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES) - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ - mkid -fID $$unique -tags: TAGS - -TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ - $(TAGS_FILES) $(LISP) +ID: $(am__tagged_files) + $(am__define_uniq_tagged_files); mkid -fID $$unique +tags: tags-am +TAGS: tags + +tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) set x; \ here=`pwd`; \ - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ + $(am__define_uniq_tagged_files); \ shift; \ if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \ test -n "$$unique" || unique=$$empty_fix; \ @@ -1314,15 +1393,11 @@ TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ $$unique; \ fi; \ fi -ctags: CTAGS -CTAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ - $(TAGS_FILES) $(LISP) - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ +ctags: ctags-am + +CTAGS: ctags +ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + $(am__define_uniq_tagged_files); \ test -z "$(CTAGS_ARGS)$$unique" \ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ $$unique @@ -1331,6 +1406,21 @@ GTAGS: here=`$(am__cd) $(top_builddir) && pwd` \ && $(am__cd) $(top_srcdir) \ && gtags -i $(GTAGS_ARGS) "$$here" +cscopelist: cscopelist-am + +cscopelist-am: $(am__tagged_files) + list='$(am__tagged_files)'; \ + case "$(srcdir)" in \ + [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \ + *) sdir=$(subdir)/$(srcdir) ;; \ + esac; \ + for i in $$list; do \ + if test -f "$$i"; then \ + echo "$(subdir)/$$i"; \ + else \ + echo "$$sdir/$$i"; \ + fi; \ + done >> $(top_builddir)/cscope.files distclean-tags: -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags @@ -1477,21 +1567,21 @@ uninstall-am: uninstall-pkglibLTLIBRARIES uninstall-sbinPROGRAMS .MAKE: install-am install-strip -.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \ +.PHONY: CTAGS GTAGS TAGS all all-am check check-am clean clean-generic \ clean-libtool clean-noinstLTLIBRARIES clean-pkglibLTLIBRARIES \ - clean-sbinPROGRAMS ctags distclean distclean-compile \ - distclean-generic distclean-libtool distclean-tags distdir dvi \ - dvi-am html html-am info info-am install install-am \ - install-data install-data-am install-dvi install-dvi-am \ - install-exec install-exec-am install-html install-html-am \ - install-info install-info-am install-man install-pdf \ - install-pdf-am install-pkglibLTLIBRARIES install-ps \ - install-ps-am install-sbinPROGRAMS install-strip installcheck \ - installcheck-am installdirs maintainer-clean \ + clean-sbinPROGRAMS cscopelist-am ctags ctags-am distclean \ + distclean-compile distclean-generic distclean-libtool \ + distclean-tags distdir dvi dvi-am html html-am info info-am \ + install install-am install-data install-data-am install-dvi \ + install-dvi-am install-exec install-exec-am install-html \ + install-html-am install-info install-info-am install-man \ + install-pdf install-pdf-am install-pkglibLTLIBRARIES \ + install-ps install-ps-am install-sbinPROGRAMS install-strip \ + installcheck installcheck-am installdirs maintainer-clean \ maintainer-clean-generic mostlyclean mostlyclean-compile \ mostlyclean-generic mostlyclean-libtool pdf pdf-am ps ps-am \ - tags uninstall uninstall-am uninstall-pkglibLTLIBRARIES \ - uninstall-sbinPROGRAMS + tags tags-am uninstall uninstall-am \ + uninstall-pkglibLTLIBRARIES uninstall-sbinPROGRAMS update-systemd: diff --git a/runtime/batch.h b/runtime/batch.h index 2ec0767..e3fa045 100644 --- a/runtime/batch.h +++ b/runtime/batch.h @@ -2,7 +2,7 @@ * I am not sure yet if this will become a full-blown object. For now, this header just * includes the object definition and is not accompanied by code. * - * Copyright 2009 by Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2013 by Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -46,17 +46,6 @@ typedef unsigned char batch_state_t; */ struct batch_obj_s { msg_t *pMsg; - /* work variables for action processing; these are reused for each action (or block of - * actions) - */ - sbool bPrevWasSuspended; - /* following are caches to save allocs if not absolutely necessary */ - uchar *staticActStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for strings */ - /* a cache to save malloc(), if not absolutely necessary */ - void *staticActParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; /**< for anything else */ - size_t staticLenStrings[CONF_OMOD_NUMSTRINGS_MAXSIZE]; - /* and the same for the message length (if used) */ - /* end action work variables */ }; /* the batch @@ -77,11 +66,7 @@ struct batch_s { int maxElem; /* maximum number of elements that this batch supports */ int nElem; /* actual number of element in this entry */ int nElemDeq; /* actual number of elements dequeued (and thus to be deleted) - see comment above! */ - int iDoneUpTo; /* all messages below this index have state other than RDY */ qDeqID deqID; /* ID of dequeue operation that generated this batch */ - int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ - sbool *active; /* which messages are active for processing, NULL=all */ - sbool bSingleRuleset; /* do all msgs of this batch use a single ruleset? */ batch_obj_t *pElem; /* batch elements */ batch_state_t *eltState;/* state (array!) for individual objects. NOTE: we have moved this out of batch_obj_t because we @@ -93,27 +78,9 @@ struct batch_s { }; -/* some inline functions (we may move this off to an object .. or not) */ -static inline void -batchSetSingleRuleset(batch_t *pBatch, sbool val) { - pBatch->bSingleRuleset = val; -} - -/* get the batches ruleset (if we have a single ruleset) */ -static inline ruleset_t* -batchGetRuleset(batch_t *pBatch) { - return (pBatch->nElem > 0) ? pBatch->pElem[0].pMsg->pRuleset : NULL; -} - -/* get the ruleset of a specifc element of the batch (index not verified!) */ -static inline ruleset_t* -batchElemGetRuleset(batch_t *pBatch, int i) { - return pBatch->pElem[i].pMsg->pRuleset; -} - /* get number of msgs for this batch */ static inline int -batchNumMsgs(batch_t *pBatch) { +batchNumMsgs(const batch_t * const pBatch) { return pBatch->nElem; } @@ -123,7 +90,7 @@ batchNumMsgs(batch_t *pBatch) { * the state table. -- rgerhards, 2010-06-10 */ static inline void -batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { +batchSetElemState(batch_t * const pBatch, const int i, const batch_state_t newState) { if(pBatch->eltState[i] != BATCH_STATE_DISC) pBatch->eltState[i] = newState; } @@ -133,9 +100,8 @@ batchSetElemState(batch_t *pBatch, int i, batch_state_t newState) { * element index is valid. -- rgerhards, 2010-06-10 */ static inline int -batchIsValidElem(batch_t *pBatch, int i) { - return( (pBatch->eltState[i] != BATCH_STATE_DISC) - && (pBatch->active == NULL || pBatch->active[i])); +batchIsValidElem(const batch_t * const pBatch, const int i) { + return(pBatch->eltState[i] != BATCH_STATE_DISC); } @@ -144,17 +110,7 @@ batchIsValidElem(batch_t *pBatch, int i) { * object itself cannot be freed! -- rgerhards, 2010-06-15 */ static inline void -batchFree(batch_t *pBatch) { - int i; - int j; - for(i = 0 ; i < pBatch->maxElem ; ++i) { - for(j = 0 ; j < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++j) { - /* staticActParams MUST be freed immediately (if required), - * so we do not need to do that! - */ - free(pBatch->pElem[i].staticActStrings[j]); - } - } +batchFree(batch_t * const pBatch) { free(pBatch->pElem); free(pBatch->eltState); } @@ -165,13 +121,11 @@ batchFree(batch_t *pBatch) { * provided. -- rgerhards, 2010-06-15 */ static inline rsRetVal -batchInit(batch_t *pBatch, int maxElem) { +batchInit(batch_t *const pBatch, const int maxElem) { DEFiRet; - pBatch->iDoneUpTo = 0; pBatch->maxElem = maxElem; CHKmalloc(pBatch->pElem = calloc((size_t)maxElem, sizeof(batch_obj_t))); CHKmalloc(pBatch->eltState = calloc((size_t)maxElem, sizeof(batch_state_t))); - // TODO: replace calloc by inidividual writes? finalize_it: RETiRet; } @@ -179,7 +133,7 @@ finalize_it: /* primarily a helper for debug purposes, get human-readble name of state */ static inline char * -batchState2String(batch_state_t state) { +batchState2String(const batch_state_t state) { switch(state) { case BATCH_STATE_RDY: return "BATCH_STATE_RDY"; diff --git a/runtime/conf.c b/runtime/conf.c index 2b000c6..83931bc 100644 --- a/runtime/conf.c +++ b/runtime/conf.c @@ -518,12 +518,10 @@ rsRetVal cflineDoAction(rsconf_t *conf, uchar **p, action_t **ppAction) bHadWarning = 1; iRet = RS_RET_OK; } - if(iRet == RS_RET_OK || iRet == RS_RET_SUSPENDED) { - if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL, - (iRet == RS_RET_SUSPENDED)? 1 : 0)) == RS_RET_OK) { + if(iRet == RS_RET_OK) { + if((iRet = addAction(&pAction, pMod, pModData, pOMSR, NULL, NULL)) == RS_RET_OK) { /* here check if the module is compatible with select features * (currently, we have no such features!) */ - pAction->eState = ACT_STATE_RDY; /* action is enabled */ conf->actions.nbrActions++; /* one more active action! */ } break; diff --git a/runtime/glbl.c b/runtime/glbl.c index 6715374..73e986e 100644 --- a/runtime/glbl.c +++ b/runtime/glbl.c @@ -64,6 +64,7 @@ DEFobjCurrIf(net) * class... */ int glblDebugOnShutdown = 0; /* start debug log when we are shut down */ +stdlog_channel_t stdlog_hdl = NULL; /* handle to be used for stdlog */ static struct cnfobj *mainqCnfObj = NULL;/* main queue object, to be used later in startup sequence */ int bProcessInternalMessages = 1; /* Should rsyslog itself process internal messages? @@ -71,6 +72,7 @@ int bProcessInternalMessages = 1; /* Should rsyslog itself process internal mess * 0 - send them to libstdlog (e.g. to push to journal) */ static uchar *pszWorkDir = NULL; +static uchar *stdlog_chanspec = NULL; static int bOptimizeUniProc = 1; /* enable uniprocessor optimizations */ static int bParseHOSTNAMEandTAG = 1; /* parser modification (based on startup params!) */ static int bPreserveFQDN = 0; /* should FQDNs always be preserved? */ @@ -93,6 +95,14 @@ static uchar *pszDfltNetstrmDrvrCAF = NULL; /* default CA file for the netstrm d static uchar *pszDfltNetstrmDrvrKeyFile = NULL; /* default key file for the netstrm driver (server) */ static uchar *pszDfltNetstrmDrvrCertFile = NULL; /* default cert file for the netstrm driver (server) */ static int bTerminateInputs = 0; /* global switch that inputs shall terminate ASAP (1=> terminate) */ +static uchar cCCEscapeChar = '#'; /* character to be used to start an escape sequence for control chars */ +static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */ +static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */ +static int bSpaceLFOnRcv = 0; /* replace newlines with spaces on reception: 0 - no, 1 - yes */ +static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */ +static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */ +static int bParserEscapeCCCStyle = 0; /* escape control characters in c style: 0 - no, 1 - yes */ + pid_t glbl_ourpid; #ifndef HAVE_ATOMIC_BUILTINS static DEF_ATOMIC_HELPER_MUT(mutTerminateInputs); @@ -113,10 +123,19 @@ static struct cnfparamdescr cnfparamdescr[] = { { "debug.logfile", eCmdHdlrString, 0 }, { "defaultnetstreamdrivercafile", eCmdHdlrString, 0 }, { "defaultnetstreamdriverkeyfile", eCmdHdlrString, 0 }, + { "defaultnetstreamdrivercertfile", eCmdHdlrString, 0 }, { "defaultnetstreamdriver", eCmdHdlrString, 0 }, { "maxmessagesize", eCmdHdlrSize, 0 }, { "action.reportsuspension", eCmdHdlrBinary, 0 }, { "action.reportsuspensioncontinuation", eCmdHdlrBinary, 0 }, + { "parser.controlcharacterescapeprefix", eCmdHdlrGetChar, 0 }, + { "parser.droptrailinglfonreception", eCmdHdlrBinary, 0 }, + { "parser.escapecontrolcharactersonreceive", eCmdHdlrBinary, 0 }, + { "parser.spacelfonreceive", eCmdHdlrBinary, 0 }, + { "parser.escape8bitcharactersonreceive", eCmdHdlrBinary, 0}, + { "parser.escapecontrolcharactertab", eCmdHdlrBinary, 0}, + { "parser.escapecontrolcharacterscstyle", eCmdHdlrBinary, 0 }, + { "stdlog.channelspec", eCmdHdlrString, 0 }, { "processinternalmessages", eCmdHdlrBinary, 0 } }; static struct cnfparamblk paramblk = @@ -161,6 +180,13 @@ SIMP_PROP(Option_DisallowWarning, option_DisallowWarning, int) SIMP_PROP(DisableDNS, bDisableDNS, int) SIMP_PROP(StripDomains, StripDomains, char**) SIMP_PROP(LocalHosts, LocalHosts, char**) +SIMP_PROP(ParserControlCharacterEscapePrefix, cCCEscapeChar, uchar) +SIMP_PROP(ParserDropTrailingLFOnReception, bDropTrailingLF, int) +SIMP_PROP(ParserEscapeControlCharactersOnReceive, bEscapeCCOnRcv, int) +SIMP_PROP(ParserSpaceLFOnReceive, bSpaceLFOnRcv, int) +SIMP_PROP(ParserEscape8BitCharactersOnReceive, bEscape8BitChars, int) +SIMP_PROP(ParserEscapeControlCharacterTab, bEscapeTab, int) +SIMP_PROP(ParserEscapeControlCharactersCStyle, bParserEscapeCCCStyle, int) #ifdef USE_UNLIMITED_SELECT SIMP_PROP(FdSetSize, iFdSetSize, int) #endif @@ -586,6 +612,13 @@ CODESTARTobjQueryInterface(glbl) SIMP_PROP(LocalDomain) SIMP_PROP(StripDomains) SIMP_PROP(LocalHosts) + SIMP_PROP(ParserControlCharacterEscapePrefix) + SIMP_PROP(ParserDropTrailingLFOnReception) + SIMP_PROP(ParserEscapeControlCharactersOnReceive) + SIMP_PROP(ParserSpaceLFOnReceive) + SIMP_PROP(ParserEscape8BitCharactersOnReceive) + SIMP_PROP(ParserEscapeControlCharacterTab) + SIMP_PROP(ParserEscapeControlCharactersCStyle) SIMP_PROP(DfltNetstrmDrvr) SIMP_PROP(DfltNetstrmDrvrCAF) SIMP_PROP(DfltNetstrmDrvrKeyFile) @@ -619,6 +652,13 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a bOptimizeUniProc = 1; bPreserveFQDN = 0; iMaxLine = 8192; + cCCEscapeChar = '#'; + bDropTrailingLF = 1; + bEscapeCCOnRcv = 1; /* default is to escape control characters */ + bSpaceLFOnRcv = 0; + bEscape8BitChars = 0; /* default is not to escape control characters */ + bEscapeTab = 1; /* default is to escape tab characters */ + bParserEscapeCCCStyle = 0; #ifdef USE_UNLIMITED_SELECT iFdSetSize = howmany(FD_SETSIZE, __NFDBITS) * sizeof (fd_mask); #endif @@ -660,6 +700,11 @@ glblProcessCnf(struct cnfobj *o) continue; if(!strcmp(paramblk.descr[i].name, "processinternalmessages")) { bProcessInternalMessages = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "stdlog.channelspec")) { + stdlog_chanspec = (uchar*) + es_str2cstr(cnfparamvals[i].val.d.estr, NULL); + stdlog_hdl = stdlog_open("rsyslogd", 0, STDLOG_SYSLOG, + (char*) stdlog_chanspec); } } } @@ -719,6 +764,10 @@ glblDoneLoadCnf(void) free(pszDfltNetstrmDrvrKeyFile); pszDfltNetstrmDrvrKeyFile = (uchar*) es_str2cstr(cnfparamvals[i].val.d.estr, NULL); + } else if(!strcmp(paramblk.descr[i].name, "defaultnetstreamdrivercertfile")) { + free(pszDfltNetstrmDrvrCertFile); + pszDfltNetstrmDrvrCertFile = (uchar*) + es_str2cstr(cnfparamvals[i].val.d.estr, NULL); } else if(!strcmp(paramblk.descr[i].name, "defaultnetstreamdrivercafile")) { free(pszDfltNetstrmDrvrCAF); pszDfltNetstrmDrvrCAF = (uchar*) @@ -741,6 +790,20 @@ glblDoneLoadCnf(void) } else if(!strcmp(paramblk.descr[i].name, "debug.onshutdown")) { glblDebugOnShutdown = (int) cnfparamvals[i].val.d.n; errmsg.LogError(0, RS_RET_OK, "debug: onShutdown set to %d", glblDebugOnShutdown); + } else if(!strcmp(paramblk.descr[i].name, "parser.controlcharacterescapeprefix")) { + cCCEscapeChar = (uchar) *es_str2cstr(cnfparamvals[i].val.d.estr, NULL); + } else if(!strcmp(paramblk.descr[i].name, "parser.droptrailinglfonreception")) { + bDropTrailingLF = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "parser.escapecontrolcharactersonreceive")) { + bEscapeCCOnRcv = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "parser.spacelfonreceive")) { + bSpaceLFOnRcv = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "parser.escape8bitcharactersonreceive")) { + bEscape8BitChars = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "parser.escapecontrolcharactertab")) { + bEscapeTab = (int) cnfparamvals[i].val.d.n; + } else if(!strcmp(paramblk.descr[i].name, "parser.escapecontrolcharacterscstyle")) { + bParserEscapeCCCStyle = (int) cnfparamvals[i].val.d.n; } else if(!strcmp(paramblk.descr[i].name, "debug.logfile")) { if(pszAltDbgFileName == NULL) { pszAltDbgFileName = es_str2cstr(cnfparamvals[i].val.d.estr, NULL); @@ -785,8 +848,16 @@ BEGINAbstractObjClassInit(glbl, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(regCfSysLineHdlr((uchar *)"localhostipif", 0, eCmdHdlrGetWord, setLocalHostIPIF, NULL, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"optimizeforuniprocessor", 0, eCmdHdlrBinary, NULL, &bOptimizeUniProc, NULL)); CHKiRet(regCfSysLineHdlr((uchar *)"preservefqdn", 0, eCmdHdlrBinary, NULL, &bPreserveFQDN, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"maxmessagesize", 0, eCmdHdlrSize, - NULL, &iMaxLine, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"maxmessagesize", 0, eCmdHdlrSize, NULL, &iMaxLine, NULL)); + + /* Deprecated parser config options */ + CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"spacelfonreceive", 0, eCmdHdlrBinary, NULL, &bSpaceLFOnRcv, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL)); + CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); INIT_ATOMIC_HELPER_MUT(mutTerminateInputs); diff --git a/runtime/glbl.h b/runtime/glbl.h index 5ebab21..8a9f038 100644 --- a/runtime/glbl.h +++ b/runtime/glbl.h @@ -8,7 +8,7 @@ * Please note that there currently is no glbl.c file as we do not yet * have any implementations. * - * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2014 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -31,6 +31,7 @@ #define GLBL_H_INCLUDED #include <sys/types.h> +#include <liblogging/stdlog.h> #include "rainerscript.h" #include "prop.h" @@ -38,6 +39,7 @@ extern pid_t glbl_ourpid; extern int bProcessInternalMessages; +extern stdlog_channel_t stdlog_hdl; /* interfaces */ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */ @@ -62,6 +64,14 @@ BEGINinterface(glbl) /* name must also be changed in ENDinterface macro! */ SIMP_PROP(DfltNetstrmDrvrCAF, uchar*) SIMP_PROP(DfltNetstrmDrvrKeyFile, uchar*) SIMP_PROP(DfltNetstrmDrvrCertFile, uchar*) + SIMP_PROP(ParserControlCharacterEscapePrefix, uchar) + SIMP_PROP(ParserDropTrailingLFOnReception, int) + SIMP_PROP(ParserEscapeControlCharactersOnReceive, int) + SIMP_PROP(ParserSpaceLFOnReceive, int) + SIMP_PROP(ParserEscape8BitCharactersOnReceive, int) + SIMP_PROP(ParserEscapeControlCharacterTab, int) + SIMP_PROP(ParserEscapeControlCharactersCStyle, int) + /* added v3, 2009-06-30 */ rsRetVal (*GenerateLocalHostNameProperty)(void); prop_t* (*GetLocalHostNameProp)(void); diff --git a/runtime/lookup.c b/runtime/lookup.c index f2af17b..f7ed899 100644 --- a/runtime/lookup.c +++ b/runtime/lookup.c @@ -123,15 +123,18 @@ bs_arrcmp_strtab(const void *s1, const void *s2) rsRetVal lookupBuildTable(lookup_t *pThis, struct json_object *jroot) { - struct json_object *jversion, *jnomatch, *jtype, *jtab; + //struct json_object *jversion, *jnomatch, *jtype, *jtab; + struct json_object *jtab; struct json_object *jrow, *jindex, *jvalue; uint32_t i; uint32_t maxStrSize; DEFiRet; +#if 0 // enable when we continue to work on this module jversion = json_object_object_get(jroot, "version"); jnomatch = json_object_object_get(jroot, "nomatch"); jtype = json_object_object_get(jroot, "type"); +#endif jtab = json_object_object_get(jroot, "table"); pThis->nmemb = json_object_array_length(jtab); CHKmalloc(pThis->d.strtab = malloc(pThis->nmemb * sizeof(lookup_string_tab_etry_t))); diff --git a/runtime/module-template.h b/runtime/module-template.h index 8a958f9..19b0ca3 100644 --- a/runtime/module-template.h +++ b/runtime/module-template.h @@ -175,6 +175,44 @@ static rsRetVal freeInstance(void* pModData)\ RETiRet;\ } +/* createWrkrInstance() + */ +#define BEGINcreateWrkrInstance \ +static rsRetVal createWrkrInstance(wrkrInstanceData_t **ppWrkrData, instanceData *pData)\ + {\ + DEFiRet; /* store error code here */\ + wrkrInstanceData_t *pWrkrData; /* use this to point to data elements */ + +#define CODESTARTcreateWrkrInstance \ + if((pWrkrData = calloc(1, sizeof(wrkrInstanceData_t))) == NULL) {\ + *ppWrkrData = NULL;\ + ENDfunc \ + return RS_RET_OUT_OF_MEMORY;\ + } \ + pWrkrData->pData = pData; + +#define ENDcreateWrkrInstance \ + *ppWrkrData = pWrkrData;\ + RETiRet;\ +} + +/* freeWrkrInstance */ +#define BEGINfreeWrkrInstance \ +static rsRetVal freeWrkrInstance(void* pd)\ +{\ + DEFiRet;\ + wrkrInstanceData_t *pWrkrData; + +#define CODESTARTfreeWrkrInstance \ + pWrkrData = (wrkrInstanceData_t*) pd; + +#define ENDfreeWrkrInstance \ + if(pWrkrData != NULL)\ + free(pWrkrData); /* we need to free this in any case */\ + RETiRet;\ +} + + /* isCompatibleWithFeature() */ #define BEGINisCompatibleWithFeature \ @@ -194,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINbeginTransaction \ -static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -205,11 +243,28 @@ static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\ } +/* commitTransaction() + * Commits a transaction. Note that beginTransaction() must have been + * called before this entry point. It receives the full batch of messages + * to be processed in pParam parameter. + * introduced in v8.1.3 -- rgerhards, 2013-12-04 + */ +#define BEGINcommitTransaction \ +static rsRetVal commitTransaction(wrkrInstanceData_t __attribute__((unused)) *const pWrkrData, actWrkrIParams_t *const pParams, const unsigned nParams)\ +{\ + DEFiRet; + +#define CODESTARTcommitTransaction /* currently empty, but may be extended */ + +#define ENDcommitTransaction \ + RETiRet;\ +} + /* endTransaction() * introduced in v4.3.3 -- rgerhards, 2009-04-27 */ #define BEGINendTransaction \ -static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ +static rsRetVal endTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -223,7 +278,7 @@ static rsRetVal endTransaction(instanceData __attribute__((unused)) *pData)\ /* doAction() */ #define BEGINdoAction \ -static rsRetVal doAction(uchar __attribute__((unused)) **ppString, unsigned __attribute__((unused)) iMsgOpts, instanceData __attribute__((unused)) *pData)\ +static rsRetVal doAction(uchar __attribute__((unused)) **ppString, wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; @@ -382,12 +437,12 @@ static rsRetVal newInpInst(struct nvlst *lst)\ * rgerhard, 2007-08-02 */ #define BEGINtryResume \ -static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\ +static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\ {\ DEFiRet; #define CODESTARTtryResume \ - assert(pData != NULL); + assert(pWrkrData != NULL); #define ENDtryResume \ RETiRet;\ @@ -448,8 +503,7 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ } /* the following definition is the standard block for queryEtryPt for output - * modules. This can be used if no specific handling (e.g. to cover version - * differences) is needed. + * modules WHICH DO NOT SUPPORT TRANSACTIONS. */ #define CODEqueryEtryPt_STD_OMOD_QUERIES \ CODEqueryEtryPt_STD_MOD_QUERIES \ @@ -467,6 +521,34 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\ *pEtryPoint = tryResume;\ } +/* the following definition is the standard block for queryEtryPt for output + * modules using the transaction interface. + */ +#define CODEqueryEtryPt_STD_OMODTX_QUERIES \ + CODEqueryEtryPt_STD_MOD_QUERIES \ + else if(!strcmp((char*) name, "beginTransaction")) {\ + *pEtryPoint = beginTransaction;\ + } else if(!strcmp((char*) name, "commitTransaction")) {\ + *pEtryPoint = commitTransaction;\ + } else if(!strcmp((char*) name, "dbgPrintInstInfo")) {\ + *pEtryPoint = dbgPrintInstInfo;\ + } else if(!strcmp((char*) name, "freeInstance")) {\ + *pEtryPoint = freeInstance;\ + } else if(!strcmp((char*) name, "parseSelectorAct")) {\ + *pEtryPoint = parseSelectorAct;\ + } else if(!strcmp((char*) name, "isCompatibleWithFeature")) {\ + *pEtryPoint = isCompatibleWithFeature;\ + } else if(!strcmp((char*) name, "tryResume")) {\ + *pEtryPoint = tryResume;\ + } + +/* standard queries for output module interface in rsyslog v8+ */ +#define CODEqueryEtryPt_STD_OMOD8_QUERIES \ + else if(!strcmp((char*) name, "createWrkrInstance")) {\ + *pEtryPoint = createWrkrInstance;\ + } else if(!strcmp((char*) name, "freeWrkrInstance")) {\ + *pEtryPoint = freeWrkrInstance;\ + } /* the following definition is queryEtryPt block that must be added * if an output module supports the transactional interface. @@ -976,9 +1058,15 @@ static rsRetVal parse(msg_t *pMsg)\ /* strgen() - main entry point of parser modules + * Note that we do NOT use size_t as this permits us to store the + * values directly into optimized heap structures. + * ppBuf is the buffer pointer + * pLenBuf is the current max size of this buffer + * pStrLen is an output parameter that MUST hold the length + * of the generated string on exit (this is cached) */ #define BEGINstrgen \ -static rsRetVal strgen(msg_t *pMsg, uchar **ppBuf, size_t *pLenBuf) \ +static rsRetVal strgen(msg_t *const pMsg, actWrkrIParams_t *const iparam) \ {\ DEFiRet; diff --git a/runtime/modules.c b/runtime/modules.c index 5660630..ad80a7b 100644 --- a/runtime/modules.c +++ b/runtime/modules.c @@ -649,9 +649,11 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ case eMOD_OUT: CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeInstance", &pNew->freeInstance)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"dbgPrintInstInfo", &pNew->dbgPrintInstInfo)); - CHKiRet((*pNew->modQueryEtryPt)((uchar*)"doAction", &pNew->mod.om.doAction)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"parseSelectorAct", &pNew->mod.om.parseSelectorAct)); CHKiRet((*pNew->modQueryEtryPt)((uchar*)"tryResume", &pNew->tryResume)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"createWrkrInstance", &pNew->mod.om.createWrkrInstance)); + CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeWrkrInstance", &pNew->mod.om.freeWrkrInstance)); + /* try load optional interfaces */ localRet = (*pNew->modQueryEtryPt)((uchar*)"doHUP", &pNew->doHUP); if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) @@ -661,11 +663,56 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_ if(localRet != RS_RET_OK && localRet != RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) ABORT_FINALIZE(localRet); + pNew->mod.om.supportsTX = 1; localRet = (*pNew->modQueryEtryPt)((uchar*)"beginTransaction", &pNew->mod.om.beginTransaction); - if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) + if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) { pNew->mod.om.beginTransaction = dummyBeginTransaction; - else if(localRet != RS_RET_OK) + pNew->mod.om.supportsTX = 0; + } else if(localRet != RS_RET_OK) { ABORT_FINALIZE(localRet); + } + + localRet = (*pNew->modQueryEtryPt)((uchar*)"doAction", + &pNew->mod.om.doAction); + if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) { + pNew->mod.om.doAction = NULL; + } else if(localRet != RS_RET_OK) { + ABORT_FINALIZE(localRet); + } + + localRet = (*pNew->modQueryEtryPt)((uchar*)"commitTransaction", + &pNew->mod.om.commitTransaction); + if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) { + pNew->mod.om.commitTransaction = NULL; + } else if(localRet != RS_RET_OK) { + ABORT_FINALIZE(localRet); + } + + if(pNew->mod.om.doAction == NULL && pNew->mod.om.commitTransaction == NULL) { + errmsg.LogError(0, RS_RET_INVLD_OMOD, + "module %s does neither provide doAction() " + "nor commitTransaction() interface - cannot " + "load", name); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + } + + if(pNew->mod.om.commitTransaction != NULL) { + if(pNew->mod.om.doAction != NULL){ + errmsg.LogError(0, RS_RET_INVLD_OMOD, + "module %s provides both doAction() " + "and commitTransaction() interface, using " + "commitTransaction()", name); + pNew->mod.om.doAction = NULL; + } + if(pNew->mod.om.beginTransaction == NULL){ + errmsg.LogError(0, RS_RET_INVLD_OMOD, + "module %s provides both commitTransaction() " + "but does not provide beginTransaction() - " + "cannot load", name); + ABORT_FINALIZE(RS_RET_INVLD_OMOD); + } + } + localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction); diff --git a/runtime/modules.h b/runtime/modules.h index 23df22d..c3a9c9e 100644 --- a/runtime/modules.h +++ b/runtime/modules.h @@ -134,11 +134,15 @@ struct modInfo_s { /* below: perform the configured action */ rsRetVal (*beginTransaction)(void*); - rsRetVal (*doAction)(uchar**, unsigned, void*); + rsRetVal (*commitTransaction)(void *const, actWrkrIParams_t *const, const unsigned); + rsRetVal (*doAction)(uchar**, void*); rsRetVal (*endTransaction)(void*); rsRetVal (*parseSelectorAct)(uchar**, void**,omodStringRequest_t**); rsRetVal (*newActInst)(uchar *modName, struct nvlst *lst, void **, omodStringRequest_t **); rsRetVal (*SetShutdownImmdtPtr)(void *pData, void *pPtr); + rsRetVal (*createWrkrInstance)(void*ppWrkrData, void*pData); + rsRetVal (*freeWrkrInstance)(void*pWrkrData); + sbool supportsTX; /* set if the module supports transactions */ } om; struct { /* data for library modules */ char dummy; @@ -147,7 +151,7 @@ struct modInfo_s { rsRetVal (*parse)(msg_t*); } pm; struct { /* data for strgen modules */ - rsRetVal (*strgen)(msg_t*, uchar**, size_t *); + rsRetVal (*strgen)(const msg_t*const, actWrkrIParams_t *const iparam); } sm; } mod; void *pModHdlr; /* handler to the dynamic library holding the module */ diff --git a/runtime/msg.c b/runtime/msg.c index 8863a67..2959796 100644 --- a/runtime/msg.c +++ b/runtime/msg.c @@ -1605,7 +1605,7 @@ getTimeReported(msg_t * const pM, enum tplFormatTypes eFmt) return "INVALID eFmt OPTION!"; } -static inline char *getTimeGenerated(msg_t * const pM, enum tplFormatTypes eFmt) +static char *getTimeGenerated(msg_t * const pM, enum tplFormatTypes eFmt) { BEGINfunc if(pM == NULL) @@ -2622,14 +2622,17 @@ finalize_it: /* Encode a JSON value and add it to provided string. Note that * the string object may be NULL. In this case, it is created - * if and only if escaping is needed. + * if and only if escaping is needed. if escapeAll is false, previously + * escaped strings are left as is */ static rsRetVal -jsonAddVal(uchar *pSrc, unsigned buflen, es_str_t **dst) +jsonAddVal(uchar *pSrc, unsigned buflen, es_str_t **dst, int escapeAll) { unsigned char c; es_size_t i; char numbuf[4]; + unsigned ni; + unsigned char nc; int j; DEFiRet; @@ -2665,6 +2668,23 @@ jsonAddVal(uchar *pSrc, unsigned buflen, es_str_t **dst) es_addBuf(dst, "\\/", 2); break; case '\\': + if (escapeAll == RSFALSE) { + ni = i + 1; + if (ni <= buflen) { + nc = pSrc[ni]; + + /* Attempt to not double encode */ + if ( nc == '"' || nc == '/' || nc == '\\' || nc == 'b' || nc == 'f' + || nc == 'n' || nc == 'r' || nc == 't' || nc == 'u') { + + es_addChar(dst, c); + es_addChar(dst, nc); + i = ni; + break; + } + } + } + es_addBuf(dst, "\\\\", 2); break; case '\010': @@ -2710,7 +2730,7 @@ finalize_it: * rgerhards, 2012-03-16 */ static rsRetVal -jsonEncode(uchar **ppRes, unsigned short *pbMustBeFreed, int *pBufLen) +jsonEncode(uchar **ppRes, unsigned short *pbMustBeFreed, int *pBufLen, int escapeAll) { unsigned buflen; uchar *pSrc; @@ -2719,7 +2739,7 @@ jsonEncode(uchar **ppRes, unsigned short *pbMustBeFreed, int *pBufLen) pSrc = *ppRes; buflen = (*pBufLen == -1) ? ustrlen(pSrc) : *pBufLen; - CHKiRet(jsonAddVal(pSrc, buflen, &dst)); + CHKiRet(jsonAddVal(pSrc, buflen, &dst, escapeAll)); if(dst != NULL) { /* we updated the string and need to replace the @@ -2748,7 +2768,7 @@ finalize_it: * something to consider at a later stage. rgerhards, 2012-04-19 */ static rsRetVal -jsonField(struct templateEntry *pTpe, uchar **ppRes, unsigned short *pbMustBeFreed, int *pBufLen) +jsonField(struct templateEntry *pTpe, uchar **ppRes, unsigned short *pbMustBeFreed, int *pBufLen, int escapeAll) { unsigned buflen; uchar *pSrc; @@ -2762,7 +2782,7 @@ jsonField(struct templateEntry *pTpe, uchar **ppRes, unsigned short *pbMustBeFre es_addChar(&dst, '"'); es_addBuf(&dst, (char*)pTpe->fieldName, pTpe->lenFieldName); es_addBufConstcstr(&dst, "\":\""); - CHKiRet(jsonAddVal(pSrc, buflen, &dst)); + CHKiRet(jsonAddVal(pSrc, buflen, &dst, escapeAll)); es_addChar(&dst, '"'); if(*pbMustBeFreed) @@ -2819,9 +2839,9 @@ finalize_it: #define RET_OUT_OF_MEMORY { *pbMustBeFreed = 0;\ *pPropLen = sizeof("**OUT OF MEMORY**") - 1; \ return(UCHAR_CONSTANT("**OUT OF MEMORY**"));} -uchar *MsgGetProp(msg_t * const pMsg, struct templateEntry *pTpe, - msgPropDescr_t *pProp, rs_size_t *pPropLen, - unsigned short *pbMustBeFreed, struct syslogTime *ttNow) +uchar *MsgGetProp(msg_t *__restrict__ const pMsg, struct templateEntry *__restrict__ const pTpe, + msgPropDescr_t *pProp, rs_size_t *__restrict__ const pPropLen, + unsigned short *__restrict__ const pbMustBeFreed, struct syslogTime * const ttNow) { uchar *pRes; /* result pointer */ rs_size_t bufLen = -1; /* length of string or -1, if not known */ @@ -3674,9 +3694,13 @@ uchar *MsgGetProp(msg_t * const pMsg, struct templateEntry *pTpe, bufLen = -1; *pbMustBeFreed = 1; } else if(pTpe->data.field.options.bJSON) { - jsonEncode(&pRes, pbMustBeFreed, &bufLen); + jsonEncode(&pRes, pbMustBeFreed, &bufLen, RSTRUE); } else if(pTpe->data.field.options.bJSONf) { - jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen); + jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen, RSTRUE); + } else if(pTpe->data.field.options.bJSONr) { + jsonEncode(&pRes, pbMustBeFreed, &bufLen, RSFALSE); + } else if(pTpe->data.field.options.bJSONfr) { + jsonField(pTpe, &pRes, pbMustBeFreed, &bufLen, RSFALSE); } *pPropLen = (bufLen == -1) ? ustrlen(pRes) : bufLen; diff --git a/runtime/parser.c b/runtime/parser.c index 74b28f4..01ed73e 100644 --- a/runtime/parser.c +++ b/runtime/parser.c @@ -57,13 +57,9 @@ DEFobjCurrIf(ruleset) /* static data */ -/* config data */ -static uchar cCCEscapeChar = '#';/* character to be used to start an escape sequence for control chars */ -static int bEscapeCCOnRcv = 1; /* escape control characters on reception: 0 - no, 1 - yes */ -static int bSpaceLFOnRcv = 0; /* replace newlines with spaces on reception: 0 - no, 1 - yes */ -static int bEscape8BitChars = 0; /* escape characters > 127 on reception: 0 - no, 1 - yes */ -static int bEscapeTab = 1; /* escape tab control character when doing CC escapes: 0 - no, 1 - yes */ -static int bDropTrailingLF = 1; /* drop trailing LF's on reception? */ +static char hexdigit[16] = + {'0', '1', '2', '3', '4', '5', '6', '7', '8', + '9', 'A', 'B', 'C', 'D', 'E', 'F' }; /* This is the list of all parsers known to us. * This is also used to unload all modules on shutdown. @@ -319,6 +315,7 @@ SanitizeMsg(msg_t *pMsg) size_t iDst; size_t iMaxLine; size_t maxDest; + uchar pc; sbool bUpdatedLen = RSFALSE; uchar szSanBuf[32*1024]; /* buffer used for sanitizing a string */ @@ -343,7 +340,7 @@ SanitizeMsg(msg_t *pMsg) * compatible to recent IETF developments, we allow the user to * turn on/off this handling. rgerhards, 2007-07-23 */ - if(bDropTrailingLF && pszMsg[lenMsg-1] == '\n') { + if(glbl.GetParserDropTrailingLFOnReception() && pszMsg[lenMsg-1] == '\n') { DBGPRINTF("dropped LF at very end of message (DropTrailingLF is set)\n"); lenMsg--; pszMsg[lenMsg] = '\0'; @@ -363,14 +360,15 @@ SanitizeMsg(msg_t *pMsg) int bNeedSanitize = 0; for(iSrc = 0 ; iSrc < lenMsg ; iSrc++) { if(pszMsg[iSrc] < 32) { - if(bSpaceLFOnRcv && pszMsg[iSrc] == '\n') + if(glbl.GetParserSpaceLFOnReceive() && pszMsg[iSrc] == '\n') { pszMsg[iSrc] = ' '; - else if(pszMsg[iSrc] == '\0' || bEscapeCCOnRcv) { + } else if(pszMsg[iSrc] == '\0' || glbl.GetParserEscapeControlCharactersOnReceive()) { bNeedSanitize = 1; - if (!bSpaceLFOnRcv) + if (!glbl.GetParserSpaceLFOnReceive()) { break; + } } - } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) { + } else if(pszMsg[iSrc] > 127 && glbl.GetParserEscape8BitCharactersOnReceive()) { bNeedSanitize = 1; break; } @@ -387,6 +385,7 @@ SanitizeMsg(msg_t *pMsg) */ iMaxLine = glbl.GetMaxLine(); maxDest = lenMsg * 4; /* message can grow at most four-fold */ + if(maxDest > iMaxLine) maxDest = iMaxLine; /* but not more than the max size! */ if(maxDest < sizeof(szSanBuf)) @@ -399,28 +398,82 @@ SanitizeMsg(msg_t *pMsg) } iDst = iSrc; while(iSrc < lenMsg && iDst < maxDest - 3) { /* leave some space if last char must be escaped */ - if((pszMsg[iSrc] < 32) && (pszMsg[iSrc] != '\t' || bEscapeTab)) { + if((pszMsg[iSrc] < 32) && (pszMsg[iSrc] != '\t' || glbl.GetParserEscapeControlCharacterTab())) { /* note: \0 must always be escaped, the rest of the code currently * can not handle it! -- rgerhards, 2009-08-26 */ - if(pszMsg[iSrc] == '\0' || bEscapeCCOnRcv) { + if(pszMsg[iSrc] == '\0' || glbl.GetParserEscapeControlCharactersOnReceive()) { /* we are configured to escape control characters. Please note * that this most probably break non-western character sets like * Japanese, Korean or Chinese. rgerhards, 2007-07-17 */ - pDst[iDst++] = cCCEscapeChar; + if (glbl.GetParserEscapeControlCharactersCStyle()) { + pDst[iDst++] = '\\'; + + switch (pszMsg[iSrc]) { + case '\0': + pDst[iDst++] = '0'; + break; + case '\a': + pDst[iDst++] = 'a'; + break; + case '\b': + pDst[iDst++] = 'b'; + break; + case '\e': + pDst[iDst++] = 'e'; + break; + case '\f': + pDst[iDst++] = 'f'; + break; + case '\n': + pDst[iDst++] = 'n'; + break; + case '\r': + pDst[iDst++] = 'r'; + break; + case '\t': + pDst[iDst++] = 't'; + break; + case '\v': + pDst[iDst++] = 'v'; + break; + default: + pDst[iDst++] = 'x'; + + pc = pszMsg[iSrc]; + pDst[iDst++] = hexdigit[(pc & 0xF0) >> 4]; + pDst[iDst++] = hexdigit[pc & 0xF]; + + break; + } + + } else { + pDst[iDst++] = glbl.GetParserControlCharacterEscapePrefix(); + pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6); + pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3); + pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007)); + } + } + + } else if(pszMsg[iSrc] > 127 && glbl.GetParserEscape8BitCharactersOnReceive()) { + if (glbl.GetParserEscapeControlCharactersCStyle()) { + pDst[iDst++] = '\\'; + pDst[iDst++] = 'x'; + + pc = pszMsg[iSrc]; + pDst[iDst++] = hexdigit[(pc & 0xF0) >> 4]; + pDst[iDst++] = hexdigit[pc & 0xF]; + + } else { + /* In this case, we also do the conversion. Note that this most + * probably breaks European languages. -- rgerhards, 2010-01-27 + */ + pDst[iDst++] = glbl.GetParserControlCharacterEscapePrefix(); pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6); pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3); pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007)); } - } else if(pszMsg[iSrc] > 127 && bEscape8BitChars) { - /* In this case, we also do the conversion. Note that this most - * probably breaks European languages. -- rgerhards, 2010-01-27 - */ - pDst[iDst++] = cCCEscapeChar; - pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0300) >> 6); - pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0070) >> 3); - pDst[iDst++] = '0' + ((pszMsg[iSrc] & 0007)); } else { pDst[iDst++] = pszMsg[iSrc]; } @@ -653,24 +706,6 @@ CODESTARTobjQueryInterface(parser) finalize_it: ENDobjQueryInterface(parser) - - -/* Reset config variables to default values. - * rgerhards, 2007-07-17 - */ -static rsRetVal -resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal) -{ - cCCEscapeChar = '#'; - bEscapeCCOnRcv = 1; /* default is to escape control characters */ - bSpaceLFOnRcv = 0; - bEscape8BitChars = 0; /* default is to escape control characters */ - bEscapeTab = 1; /* default is to escape control characters */ - bDropTrailingLF = 1; /* default is to drop trailing LF's on reception */ - - return RS_RET_OK; -} - /* This destroys the master parserlist and all of its parser entries. MUST only be * done when the module is shut down. Parser modules are NOT unloaded, rsyslog * does that at a later stage for all dynamically loaded modules. @@ -714,15 +749,6 @@ BEGINObjClassInit(parser, 1, OBJ_IS_CORE_MODULE) /* class, version */ CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(ruleset, CORE_COMPONENT)); - CHKiRet(regCfSysLineHdlr((uchar *)"controlcharacterescapeprefix", 0, eCmdHdlrGetChar, NULL, &cCCEscapeChar, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"droptrailinglfonreception", 0, eCmdHdlrBinary, NULL, &bDropTrailingLF, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscapeCCOnRcv, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"spacelfonreceive", 0, eCmdHdlrBinary, NULL, &bSpaceLFOnRcv, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"escape8bitcharactersonreceive", 0, eCmdHdlrBinary, NULL, &bEscape8BitChars, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"escapecontrolcharactertab", 0, eCmdHdlrBinary, NULL, &bEscapeTab, NULL)); - CHKiRet(regCfSysLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler, resetConfigVariables, NULL, NULL)); - InitParserList(&pParsLstRoot); InitParserList(&pDfltParsLst); ENDObjClassInit(parser) - diff --git a/runtime/parser.h b/runtime/parser.h index 87a6269..17a7a8f 100644 --- a/runtime/parser.h +++ b/runtime/parser.h @@ -21,7 +21,6 @@ #ifndef INCLUDED_PARSER_H #define INCLUDED_PARSER_H - /* we create a small helper object, a list of parsers, that we can use to * build a chain of them whereever this is needed (initially thought to be * used in ruleset.c as well as ourselvs). diff --git a/runtime/queue.c b/runtime/queue.c index 59ac113..4664010 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -86,7 +86,6 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiS static rsRetVal qAddDirect(qqueue_t *pThis, msg_t *pMsg); static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis); -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis); static rsRetVal qDestructDisk(qqueue_t *pThis); rsRetVal qqueueSetSpoolDir(qqueue_t *pThis, uchar *pszSpoolDir, int lenSpoolDir); @@ -709,9 +708,13 @@ queueSwitchToEmergencyMode(qqueue_t *pThis, rsRetVal initiatingError) pThis->qType = QUEUETYPE_DIRECT; pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; if(pThis->pqParent != NULL) { DBGOPRINT((obj_t*) pThis, "DA queue is in emergency mode, disabling DA in parent\n"); pThis->pqParent->bIsDA = 0; @@ -961,13 +964,11 @@ static rsRetVal qDestructDirect(qqueue_t __attribute__((unused)) *pThis) return RS_RET_OK; } -static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) +static rsRetVal qAddDirectWithWti(qqueue_t *pThis, msg_t* pMsg, wti_t *pWti) { batch_t singleBatch; batch_obj_t batchObj; batch_state_t batchState = BATCH_STATE_RDY; - sbool active = 1; - int i; DEFiRet; //TODO: init batchObj (states _OK and new fields -- CHECK) @@ -987,46 +988,29 @@ static rsRetVal qAddDirect(qqueue_t *pThis, msg_t* pMsg) singleBatch.nElem = 1; /* there always is only one in direct mode */ singleBatch.pElem = &batchObj; singleBatch.eltState = &batchState; - singleBatch.active = &active; - iRet = pThis->pConsumer(pThis->pAction, &singleBatch, &pThis->bShutdownImmediate); - /* delete the batch string params: TODO: create its own "class" for this */ - for(i = 0 ; i < CONF_OMOD_NUMSTRINGS_MAXSIZE ; ++i) { - free(batchObj.staticActStrings[i]); - } + iRet = pThis->pConsumer(pThis->pAction, &singleBatch, pWti); msgDestruct(&pMsg); RETiRet; } -/* "enqueue" a batch in direct mode. This is a shortcut which saves all the overhead - * otherwise incured. -- rgerhards, ~2010-06-23 +/* this is called if we do not have a pWti. This currently only happens + * when we are called from a main queue in direct mode. If so, we need + * to obtain a dummy pWti. */ -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch) +static rsRetVal +qAddDirect(qqueue_t *pThis, msg_t* pMsg) { + wti_t *pWti; DEFiRet; - ASSERT(pThis != NULL); - - /* calling the consumer is quite different here than it is from a worker thread */ - /* we need to provide the consumer's return value back to the caller because in direct - * mode the consumer probably has a lot to convey (which get's lost in the other modes - * because they are asynchronous. But direct mode is deliberately synchronous. - * rgerhards, 2008-02-12 - * We use our knowledge about the batch_t structure below, but without that, we - * pay a too-large performance toll... -- rgerhards, 2009-04-22 - */ - iRet = pThis->pConsumer(pThis->pAction, pBatch, NULL); - + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + iRet = qAddDirectWithWti(pThis, pMsg, pWti); RETiRet; } -static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis) -{ - return RS_RET_OK; -} - - /* --------------- end type-specific handlers -------------------- */ @@ -1321,7 +1305,7 @@ finalize_it: * to modify some parameters before the queue is actually started. */ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*,int*)) + int iMaxQueueSize, rsRetVal (*pConsumer)(void*, batch_t*, wti_t*)) { DEFiRet; qqueue_t *pThis; @@ -1703,6 +1687,9 @@ DequeueConsumable(qqueue_t *pThis, wti_t *pWti) /* The rate limiter * + * IMPORTANT: the rate-limiter MUST unlock and re-lock the queue when + * it actually delays processing. Otherwise inputs are stalled. + * * Here we may wait if a dequeue time window is defined or if we are * rate-limited. TODO: If we do so, we should also look into the * way new worker threads are spawned. Obviously, it doesn't make much @@ -1788,8 +1775,10 @@ RateLimiter(qqueue_t *pThis) } if(iDelay > 0) { + pthread_mutex_unlock(pThis->mut); DBGOPRINT((obj_t*) pThis, "outside dequeue time window, delaying %d seconds\n", iDelay); srSleep(iDelay, 0); + pthread_mutex_lock(pThis->mut); } RETiRet; @@ -1883,7 +1872,8 @@ ConsumerReg(qqueue_t *pThis, wti_t *pWti) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &iCancelStateSave); - CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, &pThis->bShutdownImmediate)); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; + CHKiRet(pThis->pConsumer(pThis->pAction, &pWti->batch, pWti)); /* we now need to check if we should deliberately delay processing a bit * and, if so, do that. -- rgerhards, 2008-01-30 @@ -2116,9 +2106,13 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ case QUEUETYPE_DIRECT: pThis->qConstruct = qConstructDirect; pThis->qDestruct = qDestructDirect; + /* these entry points shall not be used in direct mode + * To catch program errors, make us abort if that happens! + * rgerhards, 2013-11-05 + */ pThis->qAdd = qAddDirect; - pThis->qDel = qDelDirect; pThis->MultiEnq = qqueueMultiEnqObjDirect; + pThis->qDel = NULL; break; } @@ -2191,9 +2185,12 @@ qqueueStart(qqueue_t *pThis) /* this is the ConstructionFinalizer */ pThis->iLowWtrMrk = 1; } } + if( pThis->iMinMsgsPerWrkr < 1 - || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) + || pThis->iMinMsgsPerWrkr > pThis->iMaxQueueSize ) { pThis->iMinMsgsPerWrkr = pThis->iMaxQueueSize / pThis->iNumWorkerThreads; + } + if(pThis->iFullDlyMrk == -1 || pThis->iFullDlyMrk > pThis->iMaxQueueSize) { pThis->iFullDlyMrk = (pThis->iMaxQueueSize / 100) * 97; if(pThis->iFullDlyMrk == 0) { @@ -2799,13 +2796,14 @@ static rsRetVal qqueueMultiEnqObjDirect(qqueue_t *pThis, multi_submit_t *pMultiSub) { int i; + wti_t *pWti; DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - assert(pMultiSub != NULL); + pWti = wtiGetDummy(); + pWti->pbShutdownImmediate = &pThis->bShutdownImmediate; for(i = 0 ; i < pMultiSub->nElem ; ++i) { - CHKiRet(qAddDirect(pThis, (void*)pMultiSub->ppMsgs[i])); + CHKiRet(qAddDirectWithWti(pThis, (void*)pMultiSub->ppMsgs[i], pWti)); } finalize_it: @@ -2814,22 +2812,7 @@ finalize_it: /* ------------------------------ END multi-enqueue functions ------------------------------ */ -/* enqueue a new user data element in direct mode - * NOTE/TODO: This is a TESTER/EXPERIEMENTAL, to be changed to better - * code later on (like multi submit!) 2010-06-10 - * Enqueues the new element and awakes worker thread. - */ -rsRetVal -qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg) -{ - DEFiRet; - ISOBJ_TYPE_assert(pThis, qqueue); - iRet = qAddDirect(pThis, pMsg); - RETiRet; -} - - -/* enqueue a new user data element +/* enqueue a new user data element * Enqueues the new element and awakes worker thread. */ rsRetVal diff --git a/runtime/queue.h b/runtime/queue.h index 19ea735..468e19b 100644 --- a/runtime/queue.h +++ b/runtime/queue.h @@ -103,11 +103,10 @@ struct queue_s { * the user really wanted...). -- rgerhards, 2008-04-02 */ /* end dequeue time window */ - rsRetVal (*pConsumer)(void *,batch_t*,int*); /* user-supplied consumer function for dequeued messages */ + rsRetVal (*pConsumer)(void *,batch_t*, wti_t*); /* user-supplied consumer function for dequeued messages */ /* calling interface for pConsumer: arg1 is the global user pointer from this structure, arg2 is the * user pointer array that was dequeued (actual sample: for actions, arg1 is the pAction and arg2 - * is pointer to an array of message message pointers), arg3 is a pointer to an interger which is zero - * during normal operations and one if the consumer must urgently shut down. + * is pointer to an array of message message pointers) */ /* type-specific handlers (set during construction) */ rsRetVal (*qConstruct)(struct queue_s *pThis); @@ -195,14 +194,12 @@ struct queue_s { /* prototypes */ rsRetVal qqueueDestruct(qqueue_t **ppThis); -rsRetVal qqueueEnqMsgDirect(qqueue_t *pThis, msg_t *pMsg); rsRetVal qqueueEnqMsg(qqueue_t *pThis, flowControl_t flwCtlType, msg_t *pMsg); rsRetVal qqueueStart(qqueue_t *pThis); rsRetVal qqueueSetMaxFileSize(qqueue_t *pThis, size_t iMaxFileSize); rsRetVal qqueueSetFilePrefix(qqueue_t *pThis, uchar *pszPrefix, size_t iLenPrefix); rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThreads, - int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, int*)); -rsRetVal qqueueEnqObjDirectBatch(qqueue_t *pThis, batch_t *pBatch); + int iMaxQueueSize, rsRetVal (*pConsumer)(void*,batch_t*, wti_t *)); int queueCnfParamsSet(struct nvlst *lst); rsRetVal qqueueApplyCnfParam(qqueue_t *pThis, struct nvlst *lst); void qqueueSetDefaultsRulesetQueue(qqueue_t *pThis); diff --git a/runtime/rsconf.c b/runtime/rsconf.c index c28f17b..9ccf05b 100644 --- a/runtime/rsconf.c +++ b/runtime/rsconf.c @@ -680,8 +680,12 @@ activateMainQueue() mainqCnfObj = glbl.GetmainqCnfObj(); DBGPRINTF("activateMainQueue: mainq cnf obj ptr is %p\n", mainqCnfObj); /* create message queue */ - CHKiRet_Hdlr(createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), - (mainqCnfObj == NULL) ? NULL : mainqCnfObj->nvlst)) { + iRet = createMainQueue(&pMsgQueue, UCHAR_CONSTANT("main Q"), + (mainqCnfObj == NULL) ? NULL : mainqCnfObj->nvlst); + if(iRet == RS_RET_OK) { + iRet = startMainQueue(pMsgQueue); + } + if(iRet != RS_RET_OK) { /* no queue is fatal, we need to give up in that case... */ fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet); FINALIZE; @@ -744,6 +748,7 @@ activate(rsconf_t *cnf) tellModulesActivateConfig(); startInputModules(); CHKiRet(activateActions()); + CHKiRet(activateRulesetQueues()); CHKiRet(activateMainQueue()); /* finally let the inputs run... */ runInputModules(); @@ -1221,6 +1226,7 @@ ourConf = loadConf; // TODO: remove, once ourConf is gone! ABORT_FINALIZE(RS_RET_NO_ACTIONS); } tellLexEndParsing(); + DBGPRINTF("Number of actions in this configuration: %d\n", iActionNbr); rulesetOptimizeAll(loadConf); tellCoreConfigLoadDone(); diff --git a/runtime/rsyslog.c b/runtime/rsyslog.c index a3d4057..7f020d3 100644 --- a/runtime/rsyslog.c +++ b/runtime/rsyslog.c @@ -35,7 +35,7 @@ * * Module begun 2008-04-16 by Rainer Gerhards * - * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2014 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -59,6 +59,7 @@ #include <stdio.h> #include <stdlib.h> #include <assert.h> +#include <liblogging/stdlog.h> #include "rsyslog.h" #include "obj.h" @@ -136,6 +137,8 @@ rsrtInit(char **ppErrObj, obj_if_t *pObjIF) if(iRefCount == 0) { /* init runtime only if not yet done */ + stdlog_init(0); + stdlog_hdl = NULL; #ifdef HAVE_PTHREAD_SETSCHEDPARAM CHKiRet(pthread_getschedparam(pthread_self(), &default_thr_sched_policy, diff --git a/runtime/rsyslog.h b/runtime/rsyslog.h index c0158a7..0b6a26d 100644 --- a/runtime/rsyslog.h +++ b/runtime/rsyslog.h @@ -49,6 +49,7 @@ #define CONF_PROGNAME_BUFSIZE 16 #define CONF_HOSTNAME_BUFSIZE 32 #define CONF_PROP_BUFSIZE 16 /* should be close to sizeof(ptr) or lighly above it */ +#define CONF_IPARAMS_BUFSIZE 16 /* initial size of iparams array in wti (is automatically extended) */ #define CONF_MIN_SIZE_FOR_COMPRESS 60 /* config param: minimum message size to try compression. The smaller * the message, the less likely is any compression gain. We check for * gain before we submit the message. But to do so we still need to @@ -386,6 +387,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth RS_RET_RESUMED = -2359,/**< status: action was resumed (used for reporting) */ RS_RET_RELP_NO_TLS = -2360,/**< librel does not support TLS (but TLS requested) */ + /* up to 2400 reserved for 7.5 & 7.6 */ + RS_RET_INVLD_OMOD = -2400, /**< invalid output module, does not provide proper interfaces */ + /* RainerScript error messages (range 1000.. 1999) */ RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */ RS_RET_FIELD_NOT_FOUND = 1002, /**< field() function did not find requested field */ @@ -473,6 +477,37 @@ extern pthread_attr_t default_thread_attr; extern int default_thr_sched_policy; #endif +/* The following structure defines immutable parameters which need to + * be passed as action parameters. + * + * Note that output plugins may request multiple templates. Let's say + * an output requests n templates. Than the overall table must hold + * n*nbrMsgs records, and each messages begins on a n-boundary. There + * is a macro defined below to access the proper element. + * + * WARNING: THIS STRUCTURE IS PART OF THE ***OUTPUT MODULE INTERFACE*** + * It is passed into the doCommit() function. Do NOT modify it until + * absolutely necessary - all output plugins need to be changed! + * + * If a change is "just" for internal working, consider adding a + * separate paramter outside of this structure. Of course, it is + * best to avoid this as well ;-) + * rgerhards, 2013-12-04 + */ +struct __attribute__ ((__packed__)) actWrkrIParams { + uchar *param; + uint32_t lenBuf; /* length of string buffer (if string ptr) */ + uint32_t lenStr; /* length of current string (if string ptr) */ +}; + +/* macro to access actWrkrIParams base object: + * param is ptr to base address + * nActTpls is the number of templates the action has requested + * iMsg is the message index + * iTpl is the template index + * This macro can be used for read and write access. + */ +#define actParam(param, nActTpls, iMsg, iTpl) (param[(iMsg*nActTpls)+iTpl]) /* for the time being, we do our own portability handling here. It * looks like autotools either does not yet support checks for it, or diff --git a/runtime/ruleset.c b/runtime/ruleset.c index d811a41..4bf8c05 100644 --- a/runtime/ruleset.c +++ b/runtime/ruleset.c @@ -48,6 +48,7 @@ #include "rainerscript.h" #include "srUtils.h" #include "modules.h" +#include "wti.h" #include "dirty.h" /* for main ruleset queue creation */ /* static data */ @@ -67,8 +68,8 @@ static struct cnfparamblk rspblk = }; /* forward definitions */ -static rsRetVal processBatch(batch_t *pBatch); -static rsRetVal scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active); +static rsRetVal processBatch(batch_t *pBatch, wti_t *pWti); +static rsRetVal scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti); /* ---------- linked-list key handling functions (ruleset) ---------- */ @@ -160,254 +161,130 @@ finalize_it: RETiRet; } - -/* This function is similar to processBatch(), but works on a batch that - * contains rules from multiple rulesets. In this case, we can not push - * the whole batch through the ruleset. Instead, we examine it and - * partition it into sub-rulesets which we then push through the system. - * rgerhards, 2010-06-15 - */ -static inline rsRetVal -processBatchMultiRuleset(batch_t *pBatch) +/* driver to iterate over all rulesets */ +DEFFUNC_llExecFunc(doActivateRulesetQueues) { - ruleset_t *currRuleset; - batch_t snglRuleBatch; - int i; - int iStart; /* start index of partial batch */ - int iNew; /* index for new (temporary) batch */ - int bHaveUnprocessed; /* do we (still) have unprocessed entries? (loop term predicate) */ DEFiRet; - - do { - bHaveUnprocessed = 0; - /* search for first unprocessed element */ - for(iStart = 0 ; iStart < pBatch->nElem && pBatch->eltState[iStart] == BATCH_STATE_DISC ; ++iStart) - /* just search, no action */; - if(iStart == pBatch->nElem) - break; /* everything processed */ - - /* prepare temporary batch */ - CHKiRet(batchInit(&snglRuleBatch, pBatch->nElem)); - snglRuleBatch.pbShutdownImmediate = pBatch->pbShutdownImmediate; - currRuleset = batchElemGetRuleset(pBatch, iStart); - iNew = 0; - for(i = iStart ; i < pBatch->nElem ; ++i) { - if(batchElemGetRuleset(pBatch, i) == currRuleset) { - /* for performance reasons, we copy only those members that we actually need */ - snglRuleBatch.pElem[iNew].pMsg = pBatch->pElem[i].pMsg; - snglRuleBatch.eltState[iNew] = pBatch->eltState[i]; - ++iNew; - /* We indicate the element also as done, so it will not be processed again */ - pBatch->eltState[i] = BATCH_STATE_DISC; - } else { - bHaveUnprocessed = 1; - } - } - snglRuleBatch.nElem = iNew; /* was left just right by the for loop */ - batchSetSingleRuleset(&snglRuleBatch, 1); - /* process temp batch */ - processBatch(&snglRuleBatch); - batchFree(&snglRuleBatch); - } while(bHaveUnprocessed == 1); - -finalize_it: + ruleset_t* pThis = (ruleset_t*) pData; + dbgprintf("Activating Ruleset Queue[%p] for Ruleset %s\n", + pThis->pQueue, pThis->pszName); + if(pThis->pQueue != NULL) + startMainQueue(pThis->pQueue); RETiRet; } - -/* return a new "active" structure for the batch. Free with freeActive(). */ -static inline sbool *newActive(batch_t *pBatch) +/* activate all ruleset queues */ +rsRetVal +activateRulesetQueues() { - return malloc(sizeof(sbool) * batchNumMsgs(pBatch)); - -} -static inline void freeActive(sbool *active) { free(active); } + DEFiRet; + llExecFunc(&(runConf->rulesets.llRulesets), doActivateRulesetQueues, NULL); -/* for details, see scriptExec() header comment! */ -/* call action for all messages with filter on */ -static rsRetVal -execAct(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) -{ - DEFiRet; -dbgprintf("RRRR: execAct [%s]: batch of %d elements, active %p\n", modGetName(stmt->d.act->pMod), batchNumMsgs(pBatch), active); - pBatch->active = active; - stmt->d.act->submitToActQ(stmt->d.act, pBatch); RETiRet; } + static rsRetVal -execSet(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execAct(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - int i; - struct var result; DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - cnfexprEval(stmt->d.s_set.expr, &result, pBatch->pElem[i].pMsg); - msgSetJSONFromVar(pBatch->pElem[i].pMsg, stmt->d.s_set.varname, - &result); - varDelete(&result); - } + if(stmt->d.act->bDisabled) { + DBGPRINTF("action %d died, do NOT execute\n", stmt->d.act->iActionNbr); + FINALIZE; + } + + DBGPRINTF("executing action %d\n", stmt->d.act->iActionNbr); + stmt->d.act->submitToActQ(stmt->d.act, pWti, pMsg); + if(iRet != RS_RET_DISCARDMSG) { + /* note: we ignore the error code here, as we do NEVER want to + * stop script execution due to action return code + */ + iRet = RS_RET_OK; } +finalize_it: RETiRet; } static rsRetVal -execUnset(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execSet(struct cnfstmt *stmt, msg_t *pMsg) { - int i; + struct var result; DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - msgDelJSON(pBatch->pElem[i].pMsg, stmt->d.s_unset.varname); - } - } + cnfexprEval(stmt->d.s_set.expr, &result, pMsg); + msgSetJSONFromVar(pMsg, stmt->d.s_set.varname, &result); + varDelete(&result); RETiRet; } -/* for details, see scriptExec() header comment! */ -/* "stop" simply discards the filtered items - it's just a (hopefully more intuitive - * shortcut for users. - */ static rsRetVal -execStop(batch_t *pBatch, sbool *active) +execUnset(struct cnfstmt *stmt, msg_t *pMsg) { - int i; DEFiRet; - for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pBatch->pbShutdownImmediate) ; ++i) { - if( pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) { - pBatch->eltState[i] = BATCH_STATE_DISC; - } - } + msgDelJSON(pMsg, stmt->d.s_unset.varname); RETiRet; } + static rsRetVal -execCall(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execCall(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - msg_t *pMsg; - int i; DEFiRet; if(stmt->d.s_call.ruleset == NULL) { - scriptExec(stmt->d.s_call.stmt, pBatch, active); + CHKiRet(scriptExec(stmt->d.s_call.stmt, pMsg, pWti)); } else { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - CHKmalloc(pMsg = MsgDup((msg_t*) pBatch->pElem[i].pMsg)); - DBGPRINTF("CALL: forwarding message %d to async ruleset %p\n", - i, stmt->d.s_call.ruleset->pQueue); - MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); - MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); - /* Note: we intentionally use submitMsg2() here, as we process messages - * that were already run through the rate-limiter. - */ - submitMsg2(pMsg); - } - } + CHKmalloc(pMsg = MsgDup((msg_t*) pMsg)); + DBGPRINTF("CALL: forwarding message to async ruleset %p\n", + stmt->d.s_call.ruleset->pQueue); + MsgSetFlowControlType(pMsg, eFLOWCTL_NO_DELAY); + MsgSetRuleset(pMsg, stmt->d.s_call.ruleset); + /* Note: we intentionally use submitMsg2() here, as we process messages + * that were already run through the rate-limiter. + */ + submitMsg2(pMsg); } finalize_it: RETiRet; } -/* for details, see scriptExec() header comment! */ -// save current filter, evaluate new one -// perform then (if any message) -// if ELSE given: -// set new filter, inverted -// perform else (if any messages) static rsRetVal -execIf(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +execIf(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - int i; sbool bRet; - sbool allInactive = 1; DEFiRet; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = cnfexprEvalBool(stmt->d.s_if.expr, pBatch->pElem[i].pMsg); - allInactive = 0; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d: expr eval: %d\n", i, bRet); - } - - if(allInactive) { - DBGPRINTF("execIf: all batch elements are inactive, holding execution\n"); - freeActive(newAct); - FINALIZE; - } - - if(stmt->d.s_if.t_then != NULL) { - scriptExec(stmt->d.s_if.t_then, pBatch, newAct); - } - if(stmt->d.s_if.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - FINALIZE; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_if.t_else, pBatch, newAct); + bRet = cnfexprEvalBool(stmt->d.s_if.expr, pMsg); + DBGPRINTF("if condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_if.t_then != NULL) + CHKiRet(scriptExec(stmt->d.s_if.t_then, pMsg, pWti)); + } else { + if(stmt->d.s_if.t_else != NULL) + CHKiRet(scriptExec(stmt->d.s_if.t_else, pMsg, pWti)); } - freeActive(newAct); finalize_it: RETiRet; } -/* for details, see scriptExec() header comment! */ -static void -execPRIFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static rsRetVal +execPRIFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *newAct; - msg_t *pMsg; int bRet; - int i; - newAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - pMsg = pBatch->pElem[i].pMsg; - if(active == NULL || active[i]) { - if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || - ((stmt->d.s_prifilt.pmask[pMsg->iFacility] - & (1<<pMsg->iSeverity)) == 0) ) - bRet = 0; - else - bRet = 1; - } else - bRet = 0; - newAct[i] = bRet; - DBGPRINTF("batch: item %d PRIFILT %d\n", i, newAct[i]); - } - - if(stmt->d.s_prifilt.t_then != NULL) { - scriptExec(stmt->d.s_prifilt.t_then, pBatch, newAct); - } - if(stmt->d.s_prifilt.t_else != NULL) { - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] != BATCH_STATE_DISC - && (active == NULL || active[i])) - newAct[i] = !newAct[i]; - } - scriptExec(stmt->d.s_prifilt.t_else, pBatch, newAct); + DEFiRet; + if( (stmt->d.s_prifilt.pmask[pMsg->iFacility] == TABLE_NOPRI) || + ((stmt->d.s_prifilt.pmask[pMsg->iFacility] + & (1<<pMsg->iSeverity)) == 0) ) + bRet = 0; + else + bRet = 1; + + DBGPRINTF("PRIFILT condition result is %d\n", bRet); + if(bRet) { + if(stmt->d.s_prifilt.t_then != NULL) + CHKiRet(scriptExec(stmt->d.s_prifilt.t_then, pMsg, pWti)); + } else { + if(stmt->d.s_prifilt.t_else != NULL) + CHKiRet(scriptExec(stmt->d.s_prifilt.t_else, pMsg, pWti)); } - freeActive(newAct); +finalize_it: + RETiRet; } @@ -502,79 +379,67 @@ done: return bRet; } -/* for details, see scriptExec() header comment! */ -static void -execPROPFILT(struct cnfstmt *stmt, batch_t *pBatch, sbool *active) +static rsRetVal +execPROPFILT(struct cnfstmt *stmt, msg_t *pMsg, wti_t *pWti) { - sbool *thenAct; sbool bRet; - int i; - thenAct = newActive(pBatch); - for(i = 0 ; i < batchNumMsgs(pBatch) ; ++i) { - if(*(pBatch->pbShutdownImmediate)) - return; - if(pBatch->eltState[i] == BATCH_STATE_DISC) - continue; /* will be ignored in any case */ - if(active == NULL || active[i]) { - bRet = evalPROPFILT(stmt, pBatch->pElem[i].pMsg); - } else - bRet = 0; - thenAct[i] = bRet; - DBGPRINTF("batch: item %d PROPFILT %d\n", i, thenAct[i]); - } + DEFiRet; - scriptExec(stmt->d.s_propfilt.t_then, pBatch, thenAct); - freeActive(thenAct); + bRet = evalPROPFILT(stmt, pMsg); + DBGPRINTF("PROPFILT condition result is %d\n", bRet); + if(bRet) + CHKiRet(scriptExec(stmt->d.s_propfilt.t_then, pMsg, pWti)); +finalize_it: + RETiRet; } /* The rainerscript execution engine. It is debatable if that would be better * contained in grammer/rainerscript.c, HOWEVER, that file focusses primarily * on the parsing and object creation part. So as an actual executor, it is * better suited here. - * param active: if NULL, all messages are active (to be processed), if non-null - * this is an array of the same size as the batch. If 1, the message - * is to be processed, otherwise not. - * NOTE: this function must receive batches which contain a single ruleset ONLY! * rgerhards, 2012-09-04 */ static rsRetVal -scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) +scriptExec(struct cnfstmt *root, msg_t *pMsg, wti_t *pWti) { - DEFiRet; struct cnfstmt *stmt; + DEFiRet; for(stmt = root ; stmt != NULL ; stmt = stmt->next) { + if(*pWti->pbShutdownImmediate) { + DBGPRINTF("scriptExec: ShutdownImmediate set, " + "force terminating\n"); + ABORT_FINALIZE(RS_RET_FORCE_TERM); + } if(Debug) { - dbgprintf("scriptExec: batch of %d elements, active %p, active[0]:%d\n", - batchNumMsgs(pBatch), active, (active == NULL ? 1 : active[0])); cnfstmtPrintOnly(stmt, 2, 0); } switch(stmt->nodetype) { case S_NOP: break; case S_STOP: - execStop(pBatch, active); + ABORT_FINALIZE(RS_RET_DISCARDMSG); break; case S_ACT: - execAct(stmt, pBatch, active); + CHKiRet(execAct(stmt, pMsg, pWti)); break; case S_SET: - execSet(stmt, pBatch, active); + CHKiRet(execSet(stmt, pMsg)); break; case S_UNSET: - execUnset(stmt, pBatch, active); + CHKiRet(execUnset(stmt, pMsg)); break; case S_CALL: - execCall(stmt, pBatch, active); + CHKiRet(execCall(stmt, pMsg, pWti)); break; case S_IF: - execIf(stmt, pBatch, active); + CHKiRet(execIf(stmt, pMsg, pWti)); break; case S_PRIFILT: - execPRIFILT(stmt, pBatch, active); + CHKiRet(execPRIFILT(stmt, pMsg, pWti)); break; case S_PROPFILT: - execPROPFILT(stmt, pBatch, active); + CHKiRet(execPROPFILT(stmt, pMsg, pWti)); break; default: dbgprintf("error: unknown stmt type %u during exec\n", @@ -582,36 +447,43 @@ scriptExec(struct cnfstmt *root, batch_t *pBatch, sbool *active) break; } } +finalize_it: RETiRet; } /* Process (consume) a batch of messages. Calls the actions configured. - * If the whole batch uses a singel ruleset, we can process the batch as - * a whole. Otherwise, we need to process it slower, on a message-by-message - * basis (what can be optimized to a per-ruleset basis) - * rgerhards, 2005-10-13 + * This is called by MAIN queues. */ static rsRetVal -processBatch(batch_t *pBatch) +processBatch(batch_t *pBatch, wti_t *pWti) { - ruleset_t *pThis; + int i; + msg_t *pMsg; + ruleset_t *pRuleset; DEFiRet; - assert(pBatch != NULL); - - DBGPRINTF("processBatch: batch of %d elements must be processed\n", pBatch->nElem); - if(pBatch->bSingleRuleset) { - pThis = batchGetRuleset(pBatch); - if(pThis == NULL) - pThis = ourConf->rulesets.pDflt; - ISOBJ_TYPE_assert(pThis, ruleset); - CHKiRet(scriptExec(pThis->root, pBatch, NULL)); - } else { - CHKiRet(processBatchMultiRuleset(pBatch)); + + DBGPRINTF("processBATCH: batch of %d elements must be processed\n", pBatch->nElem); + + wtiResetExecState(pWti, pBatch); + + /* execution phase */ + for(i = 0 ; i < batchNumMsgs(pBatch) && !*(pWti->pbShutdownImmediate) ; ++i) { + pMsg = pBatch->pElem[i].pMsg; + DBGPRINTF("processBATCH: next msg %d: %.128s\n", i, pMsg->pszRawMsg); + pRuleset = (pMsg->pRuleset == NULL) ? ourConf->rulesets.pDflt : pMsg->pRuleset; + scriptExec(pRuleset->root, pMsg, pWti); + // TODO: think if we need a return state of scriptExec - most probably + // the answer is "no", as we need to process the batch in any case! + // TODO: we must refactor this! flag messages as committed + batchSetElemState(pBatch, i, BATCH_STATE_COMM); } -finalize_it: - DBGPRINTF("ruleset.ProcessMsg() returns %d\n", iRet); + /* commit phase */ + dbgprintf("END batch execution phase, entering to commit phase\n"); + actionCommitAllDirect(pWti); + + DBGPRINTF("processBATCH: batch of %d elements has been processed\n", pBatch->nElem); RETiRet; } diff --git a/runtime/ruleset.h b/runtime/ruleset.h index 9905b53..d3dfd66 100644 --- a/runtime/ruleset.h +++ b/runtime/ruleset.h @@ -46,7 +46,7 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ rsRetVal (*Destruct)(ruleset_t **ppThis); rsRetVal (*DestructAllActions)(rsconf_t *conf); rsRetVal (*SetName)(ruleset_t *pThis, uchar *pszName); - rsRetVal (*ProcessBatch)(batch_t*); + rsRetVal (*ProcessBatch)(batch_t*, wti_t *); rsRetVal (*GetRuleset)(rsconf_t *conf, ruleset_t **ppThis, uchar*); rsRetVal (*SetDefaultRuleset)(rsconf_t *conf, uchar*); rsRetVal (*SetCurrRuleset)(rsconf_t *conf, uchar*); @@ -64,8 +64,9 @@ BEGINinterface(ruleset) /* name must also be changed in ENDinterface macro! */ /* AddRule() removed */ /*TODO:REMOVE*/rsRetVal (*IterateAllActions)(rsconf_t *conf, rsRetVal (*pFunc)(void*, void*), void* pParam); void (*AddScript)(ruleset_t *pThis, struct cnfstmt *script); + /* v8: changed processBatch interface */ ENDinterface(ruleset) -#define rulesetCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */ +#define rulesetCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */ /* prototypes */ @@ -104,6 +105,7 @@ rulesetHasQueue(ruleset_t *pRuleset) rsRetVal rulesetGetRuleset(rsconf_t *conf, ruleset_t **ppRuleset, uchar *pszName); rsRetVal rulesetOptimizeAll(rsconf_t *conf); rsRetVal rulesetProcessCnf(struct cnfobj *o); +rsRetVal activateRulesetQueues(void); /* Set a current rule set to already-known pointer */ static inline void diff --git a/runtime/srUtils.h b/runtime/srUtils.h index 8626a4b..1dd2506 100644 --- a/runtime/srUtils.h +++ b/runtime/srUtils.h @@ -77,7 +77,7 @@ unsigned char *srUtilStrDup(unsigned char *pOld, size_t len); * for it. * added 2007-07-17 by rgerhards */ -int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, uid_t uid, gid_t gid, int bFailOnChown); +int makeFileParentDirs(const uchar *const szFile, size_t lenFile, mode_t mode, uid_t uid, gid_t gid, int bFailOnChown); int execProg(uchar *program, int bWait, uchar *arg); void skipWhiteSpace(uchar **pp); rsRetVal genFileName(uchar **ppName, uchar *pDirName, size_t lenDirName, uchar *pFName, diff --git a/runtime/srutils.c b/runtime/srutils.c index 8eb2459..6d842ab 100644 --- a/runtime/srutils.c +++ b/runtime/srutils.c @@ -191,7 +191,7 @@ uchar *srUtilStrDup(uchar *pOld, size_t len) * try because otherwise we would potentially run into an endless loop. * loop. -- rgerhards, 2010-03-25 */ -int makeFileParentDirs(uchar *szFile, size_t lenFile, mode_t mode, +int makeFileParentDirs(const uchar *const szFile, size_t lenFile, mode_t mode, uid_t uid, gid_t gid, int bFailOnChownFail) { uchar *p; diff --git a/runtime/stream.c b/runtime/stream.c index b35d6a1..223ee93 100644 --- a/runtime/stream.c +++ b/runtime/stream.c @@ -76,7 +76,7 @@ DEFobjCurrIf(zlibw) /* forward definitions */ static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip); -static rsRetVal strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf); +static rsRetVal strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf, const size_t lenBuf); static rsRetVal strmCloseFile(strm_t *pThis); static void *asyncWriterThread(void *pPtr); static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush); @@ -1553,7 +1553,7 @@ finalize_it: /* write a *single* character to a stream object -- rgerhards, 2008-01-10 */ -static rsRetVal strmWriteChar(strm_t *pThis, uchar c) +static rsRetVal strmWriteChar(strm_t *__restrict__ const pThis, const uchar c) { DEFiRet; @@ -1586,7 +1586,7 @@ finalize_it: * strmWrite(), which does the lock (aka: we must not lock it, else we * would run into a recursive lock, resulting in a deadlock!) */ -static rsRetVal strmWriteLong(strm_t *pThis, long i) +static rsRetVal strmWriteLong(strm_t *__restrict__ const pThis, const long i) { DEFiRet; uchar szBuf[32]; @@ -1618,7 +1618,7 @@ finalize_it: * worth nothing. -- rgerhards, 2010-03-10 */ static rsRetVal -strmWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf) +strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf, size_t lenBuf) { DEFiRet; size_t iWrite; diff --git a/runtime/stream.h b/runtime/stream.h index 092d322..8f26bdc 100644 --- a/runtime/stream.h +++ b/runtime/stream.h @@ -163,7 +163,7 @@ BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC); rsRetVal (*UnreadChar)(strm_t *pThis, uchar c); rsRetVal (*SeekCurrOffs)(strm_t *pThis); - rsRetVal (*Write)(strm_t *pThis, uchar *pBuf, size_t lenBuf); + rsRetVal (*Write)(strm_t *const pThis, const uchar *const pBuf, size_t lenBuf); rsRetVal (*WriteChar)(strm_t *pThis, uchar c); rsRetVal (*WriteLong)(strm_t *pThis, long i); rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); diff --git a/runtime/typedefs.h b/runtime/typedefs.h index 37ab28b..8d206f6 100644 --- a/runtime/typedefs.h +++ b/runtime/typedefs.h @@ -109,6 +109,7 @@ typedef uint64 qDeqID; /* queue Dequeue order ID. 32 bits is considered dangerou typedef struct tcpLstnPortList_s tcpLstnPortList_t; // TODO: rename? typedef struct strmLstnPortList_s strmLstnPortList_t; // TODO: rename? +typedef struct actWrkrIParams actWrkrIParams_t; /* under Solaris (actually only SPARC), we need to redefine some types * to be void, so that we get void* pointers. Otherwise, we will see @@ -265,5 +266,3 @@ struct msgPropDescr_s { }; #endif /* multi-include protection */ -/* vim:set ai: - */ diff --git a/runtime/unicode-helper.h b/runtime/unicode-helper.h index db98ca3..ed06d24 100644 --- a/runtime/unicode-helper.h +++ b/runtime/unicode-helper.h @@ -9,7 +9,7 @@ * * Begun 2009-05-21 RGerhards * - * Copyright (C) 2009-2012 by Rainer Gerhards and Adiscon GmbH + * Copyright (C) 2009-2014 by Rainer Gerhards and Adiscon GmbH * * This file is part of the rsyslog runtime library. * @@ -36,26 +36,26 @@ # define ustrncpy(psz1, psz2, len) strncpy((char*)(psz1), (char*)(psz2), (len)) # define ustrdup(psz) (uchar*)strdup((char*)(psz)) #else - static inline uchar* ustrncpy(uchar *psz1, uchar *psz2, size_t len) + static inline uchar* ustrncpy(uchar *psz1, const uchar *psz2, size_t len) { - return (uchar*) strncpy((char*) psz1, (char*) psz2, len); + return (uchar*) strncpy((char*) psz1, (const char*) psz2, len); } - static inline uchar* ustrdup(uchar *psz) + static inline uchar* ustrdup(const uchar *psz) { - return (uchar*) strdup((char*)psz); + return (uchar*) strdup((const char*)psz); } #endif /* #ifdef DEBUG */ -static inline int ustrcmp(uchar *psz1, uchar *psz2) +static inline int ustrcmp(const uchar *psz1, const uchar *psz2) { return strcmp((char*) psz1, (char*) psz2); } static inline int ustrlen(const uchar *psz) { - return strlen((char*) psz); + return strlen((const char*) psz); } diff --git a/runtime/wti.c b/runtime/wti.c index 77197a9..3e0554a 100644 --- a/runtime/wti.c +++ b/runtime/wti.c @@ -44,12 +44,15 @@ #include "wti.h" #include "obj.h" #include "glbl.h" +#include "action.h" #include "atomic.h" /* static data */ DEFobjStaticHelpers DEFobjCurrIf(glbl) +pthread_key_t thrd_wti_key; + /* forward-definitions */ /* methods */ @@ -171,9 +174,9 @@ BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODE CODESTARTobjDestruct(wti) /* actual destruction */ batchFree(&pThis->batch); + free(pThis->actWrkrInfo); pthread_cond_destroy(&pThis->pcondBusy); DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning); - free(pThis->pszDbgHdr); ENDobjDestruct(wti) @@ -197,11 +200,20 @@ wtiConstructFinalize(wti_t *pThis) ISOBJ_TYPE_assert(pThis, wti); - DBGPRINTF("%s: finalizing construction of worker instance data\n", wtiGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n", + wtiGetDbgHdr(pThis), iActionNbr); /* initialize our thread instance descriptor (no concurrency here) */ pThis->bIsRunning = RSFALSE; + /* must use calloc as we need zero-init */ + CHKmalloc(pThis->actWrkrInfo = calloc(iActionNbr, sizeof(actWrkrInfo_t))); + + if(pThis->pWtp == NULL) { + dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n"); + FINALIZE; + } + /* we now alloc the array for user pointers. We obtain the max from the queue itself. */ CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize)); CHKiRet(batchInit(&pThis->batch, iDeqBatchSize)); @@ -272,71 +284,100 @@ doIdleProcessing(wti_t *pThis, wtp_t *pWtp, int *pbInactivityTOOccured) */ #pragma GCC diagnostic ignored "-Wempty-body" rsRetVal -wtiWorker(wti_t *pThis) +wtiWorker(wti_t *__restrict__ const pThis) { - wtp_t *pWtp; /* our worker thread pool */ + wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */ + const action_t *__restrict__ pAction; int bInactivityTOOccured = 0; rsRetVal localRet; rsRetVal terminateRet; + actWrkrInfo_t *__restrict__ wrkrInfo; int iCancelStateSave; + int i, j, k; DEFiRet; - ISOBJ_TYPE_assert(pThis, wti); - pWtp = pThis->pWtp; /* shortcut */ - ISOBJ_TYPE_assert(pWtp, wtp); - dbgSetThrdName(pThis->pszDbgHdr); pthread_cleanup_push(wtiWorkerCancelCleanup, pThis); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave); - + DBGPRINTF("wti %p: worker starting\n", pThis); /* now we have our identity, on to real processing */ - while(1) { /* loop will be broken below - need to do mutex locks */ + + /* note: in this loop, the mutex is "never" unlocked. Of course, + * this is not true: it actually is unlocked when the actual processing + * is done, as part of pWtp->pfDoWork() processing. Note that this + * function is required to re-lock it when done. We cannot do the + * lock/unlock here ourselfs, as pfDoWork() needs to access queue + * structures itself. + * The same goes for pfRateLimiter(). While we could unlock/lock when + * we call it, in practice the function is often called without any + * ratelimiting actually done. Only the rate limiter itself knows + * that. As such, it needs to bear the burden of doing the locking + * when required. -- rgerhards, 2013-11-20 + */ + d_pthread_mutex_lock(pWtp->pmutUsr); + while(1) { /* loop will be broken below */ if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */ pWtp->pfRateLimiter(pWtp->pUsr); } - d_pthread_mutex_lock(pWtp->pmutUsr); - /* first check if we are in shutdown process (but evaluate a bit later) */ terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED); if(terminateRet == RS_RET_TERMINATE_NOW) { /* we now need to free the old batch */ localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis); - DBGOPRINT((obj_t*) pThis, "terminating worker because of TERMINATE_NOW mode, del iRet %d\n", - localRet); - d_pthread_mutex_unlock(pWtp->pmutUsr); + DBGOPRINT((obj_t*) pThis, "terminating worker because of " + "TERMINATE_NOW mode, del iRet %d\n", localRet); break; } /* try to execute and process whatever we have */ - /* Note that this function releases and re-aquires the mutex. The returned - * information on idle state must be processed before releasing the mutex again. - */ localRet = pWtp->pfDoWork(pWtp->pUsr, pThis); if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) { - d_pthread_mutex_unlock(pWtp->pmutUsr); break; /* end of loop */ } else if(localRet == RS_RET_IDLE) { if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccured) { - d_pthread_mutex_unlock(pWtp->pmutUsr); DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, bInactivityTOOccured=%d\n", terminateRet, bInactivityTOOccured); break; /* end of loop */ } doIdleProcessing(pThis, pWtp, &bInactivityTOOccured); - d_pthread_mutex_unlock(pWtp->pmutUsr); continue; /* request next iteration */ } - d_pthread_mutex_unlock(pWtp->pmutUsr); - bInactivityTOOccured = 0; /* reset for next run */ } + d_pthread_mutex_unlock(pWtp->pmutUsr); + + DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis); + for(i = 0 ; i < iActionNbr ; ++i) { + wrkrInfo = &(pThis->actWrkrInfo[i]); + dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData); + if(wrkrInfo->actWrkrData != NULL) { + pAction = wrkrInfo->pAction; + pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData); + if(pAction->isTransactional) { + /* free iparam "cache" - we need to go through to max! */ + for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) { + for(k = 0 ; k < pAction->iNumTpls ; ++k) { + free(actParam(wrkrInfo->p.tx.iparams, + pAction->iNumTpls, j, k).param); + } + } + free(wrkrInfo->p.tx.iparams); + wrkrInfo->p.tx.iparams = NULL; + wrkrInfo->p.tx.currIParam = 0; + wrkrInfo->p.tx.maxIParams = 0; + } + wrkrInfo->actWrkrData = NULL; /* re-init for next activation */ + } + } + /* indicate termination */ pthread_cleanup_pop(0); /* remove cleanup handler */ pthread_setcancelstate(iCancelStateSave, NULL); + dbgprintf("wti %p: worker exiting\n", pThis); RETiRet; } @@ -376,6 +417,33 @@ finalize_it: } +/* This function returns (and creates if necessary) a dummy wti suitable + * for use by the rule engine. It is intended to be used for direct-mode + * main queues (folks, don't do that!). Once created, data is stored in + * thread-specific storage. + * Note: we do NOT do error checking -- if this functions fails, all the + * rest will fail as well... (also, it will only fail under OOM, so...). + * Memleak: we leak pWti's when run in direct mode. However, this is only + * a cosmetic leak, as we need them until all inputs are terminated, + * what means essentially until rsyslog itself is terminated. So we + * don't care -- it's just not nice in valgrind, but that's it. + */ +wti_t * +wtiGetDummy(void) +{ + wti_t *pWti; + + pWti = (wti_t*) pthread_getspecific(thrd_wti_key); + if(pWti == NULL) { + wtiConstruct(&pWti); + wtiConstructFinalize(pWti); + if(pthread_setspecific(thrd_wti_key, pWti) != 0) { + DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n"); + } + } + return pWti; +} + /* dummy */ rsRetVal wtiQueryInterface(void) { return RS_RET_NOT_IMPLEMENTED; } @@ -385,6 +453,7 @@ BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */ CODESTARTObjClassExit(nsdsel_gtls) /* release objects we no longer need */ objRelease(glbl, CORE_COMPONENT); + pthread_key_delete(thrd_wti_key); ENDObjClassExit(wti) @@ -393,8 +462,14 @@ ENDObjClassExit(wti) * rgerhards, 2008-01-09 */ BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */ + int r; /* request objects we use */ CHKiRet(objUse(glbl, CORE_COMPONENT)); + r = pthread_key_create(&thrd_wti_key, NULL); + if(r != 0) { + dbgprintf("wti.c: pthread_key_create failed\n"); + iRet = RS_RET_ERR; + } ENDObjClassInit(wti) /* vi:set ai: diff --git a/runtime/wti.h b/runtime/wti.h index b0dc6c9..b8bdac9 100644 --- a/runtime/wti.h +++ b/runtime/wti.h @@ -1,6 +1,6 @@ /* Definition of the worker thread instance (wti) class. * - * Copyright 2008-2012 Adiscon GmbH. + * Copyright 2008-2013 Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -26,35 +26,189 @@ #include "wtp.h" #include "obj.h" #include "batch.h" +#include "action.h" +#define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */ +#define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */ +#define ACT_STATE_COMM 2 /* transaction finished (a transient state) */ +#define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */ +#define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */ +/* note: 3 bit bit field --> highest value is 7! */ + +typedef struct actWrkrInfo { + action_t *pAction; + void *actWrkrData; + uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an + immediate failure following */ + int iNbrResRtry; /* number of retries since last suspend */ + struct { + unsigned actState : 3; + unsigned bJustResumed : 1; + } flags; + union { + struct { + actWrkrIParams_t *iparams;/* dynamically sized array for transactional outputs */ + int currIParam; + int maxIParams; /* current max */ + } tx; + struct { + actWrkrIParams_t actParams[CONF_OMOD_NUMSTRINGS_MAXSIZE]; + } nontx; + } p; /* short name for "parameters" */ +} actWrkrInfo_t; + /* the worker thread instance class */ struct wti_s { BEGINobjInstance; pthread_t thrdID; /* thread ID */ int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */ sbool bAlwaysRunning; /* should this thread always run? */ + int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */ wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */ - batch_t batch; /* pointer to an object array meaningful for current user pointer (e.g. queue pUsr data elemt) */ + batch_t batch; /* pointer to an object array meaningful for current user + pointer (e.g. queue pUsr data elemt) */ uchar *pszDbgHdr; /* header string for debug messages */ + actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions + (sized for max nbr of actions in config!) */ pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */ DEF_ATOMIC_HELPER_MUT(mutIsRunning); + struct { + uint8_t bPrevWasSuspended; + uint8_t bDoAutoCommit; /* do a commit after each message + * this is usually set for batches with 0 element, but may + * also be added as a user-selectable option (not implemented yet) + */ + } execState; /* state for the execution engine */ }; /* prototypes */ rsRetVal wtiConstruct(wti_t **ppThis); -rsRetVal wtiConstructFinalize(wti_t *pThis); +rsRetVal wtiConstructFinalize(wti_t * const pThis); rsRetVal wtiDestruct(wti_t **ppThis); -rsRetVal wtiWorker(wti_t *pThis); -rsRetVal wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg); -rsRetVal wtiCancelThrd(wti_t *pThis); -rsRetVal wtiSetAlwaysRunning(wti_t *pThis); -rsRetVal wtiSetState(wti_t *pThis, sbool bNew); -rsRetVal wtiWakeupThrd(wti_t *pThis); -sbool wtiGetState(wti_t *pThis); +rsRetVal wtiWorker(wti_t * const pThis); +rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg); +rsRetVal wtiCancelThrd(wti_t * const pThis); +rsRetVal wtiSetAlwaysRunning(wti_t * const pThis); +rsRetVal wtiSetState(wti_t * const pThis, sbool bNew); +rsRetVal wtiWakeupThrd(wti_t * const pThis); +sbool wtiGetState(wti_t * const pThis); +wti_t *wtiGetDummy(void); PROTOTYPEObjClassInit(wti); PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*); PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*); +static inline uint8_t +getActionStateByNbr(wti_t * const pWti, const int iActNbr) +{ + return((uint8_t) pWti->actWrkrInfo[iActNbr].flags.actState); +} + +static inline uint8_t +getActionState(wti_t * const pWti, action_t * const pAction) +{ + return((uint8_t) pWti->actWrkrInfo[pAction->iActionNbr].flags.actState); +} + +static inline void +setActionState(wti_t * const pWti, action_t * const pAction, uint8_t newState) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.actState = newState; +} + +static inline int +getActionJustResumed(wti_t * const pWti, action_t * const pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].flags.bJustResumed); +} + +static inline void +setActionJustResumed(wti_t * const pWti, action_t * const pAction, int val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].flags.bJustResumed = val; +} + + +static inline uint16_t +getActionResumeInRow(wti_t * const pWti, action_t * const pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow); +} + +static inline void +setActionResumeInRow(wti_t * const pWti, action_t * const pAction, uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow = val; +} + +static inline void +incActionResumeInRow(wti_t * const pWti, action_t * const pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].uResumeOKinRow++; +} + +static inline int +getActionNbrResRtry(wti_t * const pWti, action_t * const pAction) +{ + return(pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry); +} + +static inline void +setActionNbrResRtry(wti_t * const pWti, action_t * const pAction, const uint16_t val) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry = val; +} + +static inline void +incActionNbrResRtry(wti_t * const pWti, action_t * const pAction) +{ + pWti->actWrkrInfo[pAction->iActionNbr].iNbrResRtry++; +} + +/* note: this function is only called once in action.c */ +static inline rsRetVal +wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams) +{ + actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]); + actWrkrIParams_t *iparams; + int newMax; + DEFiRet; + + if(wrkrInfo->p.tx.currIParam == wrkrInfo->p.tx.maxIParams) { + /* we need to extend */ + dbgprintf("DDDD: extending iparams, curr max %d\n", wrkrInfo->p.tx.maxIParams); + newMax = (wrkrInfo->p.tx.maxIParams == 0) ? CONF_IPARAMS_BUFSIZE + : 2 * wrkrInfo->p.tx.maxIParams; +dbgprintf("DDDD: realloc size %u\n", sizeof(actWrkrIParams_t) * pAction->iNumTpls * newMax); + CHKmalloc(iparams = realloc(wrkrInfo->p.tx.iparams, + sizeof(actWrkrIParams_t) * pAction->iNumTpls * newMax)); +dbgprintf("DDDD: setting memory base %u, lenBytes %u, len %u\n", wrkrInfo->p.tx.currIParam * pAction->iNumTpls, + sizeof(actWrkrIParams_t) * pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams), + pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams)); + memset(iparams + (wrkrInfo->p.tx.currIParam * pAction->iNumTpls), 0, + sizeof(actWrkrIParams_t) * pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams)); + wrkrInfo->p.tx.iparams = iparams; + wrkrInfo->p.tx.maxIParams = newMax; + } +dbgprintf("DDDD: adding param %d for action %d\n", wrkrInfo->p.tx.currIParam, pAction->iActionNbr); + *piparams = wrkrInfo->p.tx.iparams + wrkrInfo->p.tx.currIParam * pAction->iNumTpls; + ++wrkrInfo->p.tx.currIParam; + +finalize_it: + RETiRet; +} + +static inline void +wtiInitIParam(actWrkrIParams_t *piparams) +{ + memset(piparams, 0, sizeof(actWrkrIParams_t)); +} + +static inline void +wtiResetExecState(wti_t * const pWti, batch_t * const pBatch) +{ + pWti->execState.bPrevWasSuspended = 0; + pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1); +} #endif /* #ifndef WTI_H_INCLUDED */ diff --git a/runtime/wtp.c b/runtime/wtp.c index 895c1ff..66942e6 100644 --- a/runtime/wtp.c +++ b/runtime/wtp.c @@ -8,7 +8,7 @@ * (and in the web doc set on http://www.rsyslog.com/doc). Be sure to read it * if you are getting aquainted to the object. * - * Copyright 2008,2009 Rainer Gerhards and Adiscon GmbH. + * Copyright 2008-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of the rsyslog runtime library. * @@ -91,6 +91,7 @@ BEGINobjConstruct(wtp) /* be sure to specify the object type also in END macro! pthread_cond_init(&pThis->condThrdTrm, NULL); pthread_attr_init(&pThis->attrThrd); /* Set thread scheduling policy to default */ +#warning do we need this any longer? I think it was a cure for an already fixed bug.. #ifdef HAVE_PTHREAD_SETSCHEDPARAM pthread_attr_setschedpolicy(&pThis->attrThrd, default_thr_sched_policy); pthread_attr_setschedparam(&pThis->attrThrd, &default_sched_param); @@ -121,7 +122,8 @@ wtpConstructFinalize(wtp_t *pThis) ISOBJ_TYPE_assert(pThis, wtp); - DBGPRINTF("%s: finalizing construction of worker thread pool\n", wtpGetDbgHdr(pThis)); + DBGPRINTF("%s: finalizing construction of worker thread pool (numworkerThreads %d)\n", + wtpGetDbgHdr(pThis), pThis->iNumWorkerThreads); /* alloc and construct workers - this can only be done in finalizer as we previously do * not know the max number of workers */ |