diff options
Diffstat (limited to 'plugins/omelasticsearch')
-rw-r--r-- | plugins/omelasticsearch/Makefile.in | 208 | ||||
-rw-r--r-- | plugins/omelasticsearch/README | 4 | ||||
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 241 |
3 files changed, 286 insertions, 167 deletions
diff --git a/plugins/omelasticsearch/Makefile.in b/plugins/omelasticsearch/Makefile.in index 0dcc30e..d6b3191 100644 --- a/plugins/omelasticsearch/Makefile.in +++ b/plugins/omelasticsearch/Makefile.in @@ -1,9 +1,8 @@ -# Makefile.in generated by automake 1.11.3 from Makefile.am. +# Makefile.in generated by automake 1.13.4 from Makefile.am. # @configure_input@ -# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, -# 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011 Free Software -# Foundation, Inc. +# Copyright (C) 1994-2013 Free Software Foundation, Inc. + # This Makefile.in is free software; the Free Software Foundation # gives unlimited permission to copy and/or distribute it, # with or without modifications, as long as this notice is preserved. @@ -16,6 +15,51 @@ @SET_MAKE@ VPATH = @srcdir@ +am__is_gnu_make = test -n '$(MAKEFILE_LIST)' && test -n '$(MAKELEVEL)' +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) pkgdatadir = $(datadir)/@PACKAGE@ pkgincludedir = $(includedir)/@PACKAGE@ pkglibdir = $(libdir)/@PACKAGE@ @@ -35,7 +79,8 @@ POST_UNINSTALL = : build_triplet = @build@ host_triplet = @host@ subdir = plugins/omelasticsearch -DIST_COMMON = README $(srcdir)/Makefile.am $(srcdir)/Makefile.in +DIST_COMMON = $(srcdir)/Makefile.in $(srcdir)/Makefile.am \ + $(top_srcdir)/depcomp README ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 am__aclocal_m4_deps = $(top_srcdir)/m4/atomic_operations.m4 \ $(top_srcdir)/m4/atomic_operations_64bit.m4 \ @@ -87,10 +132,23 @@ omelasticsearch_la_OBJECTS = $(am_omelasticsearch_la_OBJECTS) AM_V_lt = $(am__v_lt_@AM_V@) am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@) am__v_lt_0 = --silent +am__v_lt_1 = omelasticsearch_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \ $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \ $(AM_CFLAGS) $(CFLAGS) $(omelasticsearch_la_LDFLAGS) \ $(LDFLAGS) -o $@ +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) depcomp = $(SHELL) $(top_srcdir)/depcomp am__depfiles_maybe = depfiles @@ -103,22 +161,40 @@ LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ $(AM_CFLAGS) $(CFLAGS) AM_V_CC = $(am__v_CC_@AM_V@) am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@) -am__v_CC_0 = @echo " CC " $@; -AM_V_at = $(am__v_at_@AM_V@) -am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) -am__v_at_0 = @ +am__v_CC_0 = @echo " CC " $@; +am__v_CC_1 = CCLD = $(CC) LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \ $(AM_LDFLAGS) $(LDFLAGS) -o $@ AM_V_CCLD = $(am__v_CCLD_@AM_V@) am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@) -am__v_CCLD_0 = @echo " CCLD " $@; -AM_V_GEN = $(am__v_GEN_@AM_V@) -am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) -am__v_GEN_0 = @echo " GEN " $@; +am__v_CCLD_0 = @echo " CCLD " $@; +am__v_CCLD_1 = SOURCES = $(omelasticsearch_la_SOURCES) DIST_SOURCES = $(omelasticsearch_la_SOURCES) +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +# Read a list of newline-separated strings from the standard input, +# and print each of them once, without duplicates. Input order is +# *not* preserved. +am__uniquify_input = $(AWK) '\ + BEGIN { nonempty = 0; } \ + { items[$$0] = 1; nonempty = 1; } \ + END { if (nonempty) { for (i in items) print i; }; } \ +' +# Make sure the list of sources is unique. This is necessary because, +# e.g., the same source file might be shared among _SOURCES variables +# for different programs/libraries. +am__define_uniq_tagged_files = \ + list='$(am__tagged_files)'; \ + unique=`for i in $$list; do \ + if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ + done | $(am__uniquify_input)` ETAGS = etags CTAGS = ctags DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) @@ -178,8 +254,6 @@ LEXLIB = @LEXLIB@ LEX_OUTPUT_ROOT = @LEX_OUTPUT_ROOT@ LIBDBI_CFLAGS = @LIBDBI_CFLAGS@ LIBDBI_LIBS = @LIBDBI_LIBS@ -LIBEE_CFLAGS = @LIBEE_CFLAGS@ -LIBEE_LIBS = @LIBEE_LIBS@ LIBESTR_CFLAGS = @LIBESTR_CFLAGS@ LIBESTR_LIBS = @LIBESTR_LIBS@ LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@ @@ -355,9 +429,9 @@ $(top_srcdir)/configure: $(am__configure_deps) $(ACLOCAL_M4): $(am__aclocal_m4_deps) cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh $(am__aclocal_m4_deps): + install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) @$(NORMAL_INSTALL) - test -z "$(pkglibdir)" || $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" @list='$(pkglib_LTLIBRARIES)'; test -n "$(pkglibdir)" || list=; \ list2=; for p in $$list; do \ if test -f $$p; then \ @@ -365,6 +439,8 @@ install-pkglibLTLIBRARIES: $(pkglib_LTLIBRARIES) else :; fi; \ done; \ test -z "$$list2" || { \ + echo " $(MKDIR_P) '$(DESTDIR)$(pkglibdir)'"; \ + $(MKDIR_P) "$(DESTDIR)$(pkglibdir)" || exit 1; \ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(pkglibdir)'"; \ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(pkglibdir)"; \ } @@ -380,12 +456,14 @@ uninstall-pkglibLTLIBRARIES: clean-pkglibLTLIBRARIES: -test -z "$(pkglib_LTLIBRARIES)" || rm -f $(pkglib_LTLIBRARIES) - @list='$(pkglib_LTLIBRARIES)'; for p in $$list; do \ - dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \ - test "$$dir" != "$$p" || dir=.; \ - echo "rm -f \"$${dir}/so_locations\""; \ - rm -f "$${dir}/so_locations"; \ - done + @list='$(pkglib_LTLIBRARIES)'; \ + locs=`for p in $$list; do echo $$p; done | \ + sed 's|^[^/]*$$|.|; s|/[^/]*$$||; s|$$|/so_locations|' | \ + sort -u`; \ + test -z "$$locs" || { \ + echo rm -f $${locs}; \ + rm -f $${locs}; \ + } cJSON/$(am__dirstamp): @$(MKDIR_P) cJSON @: > cJSON/$(am__dirstamp) @@ -394,13 +472,14 @@ cJSON/$(DEPDIR)/$(am__dirstamp): @: > cJSON/$(DEPDIR)/$(am__dirstamp) cJSON/omelasticsearch_la-cjson.lo: cJSON/$(am__dirstamp) \ cJSON/$(DEPDIR)/$(am__dirstamp) + omelasticsearch.la: $(omelasticsearch_la_OBJECTS) $(omelasticsearch_la_DEPENDENCIES) $(EXTRA_omelasticsearch_la_DEPENDENCIES) $(AM_V_CCLD)$(omelasticsearch_la_LINK) -rpath $(pkglibdir) $(omelasticsearch_la_OBJECTS) $(omelasticsearch_la_LIBADD) $(LIBS) mostlyclean-compile: -rm -f *.$(OBJEXT) - -rm -f cJSON/omelasticsearch_la-cjson.$(OBJEXT) - -rm -f cJSON/omelasticsearch_la-cjson.lo + -rm -f cJSON/*.$(OBJEXT) + -rm -f cJSON/*.lo distclean-compile: -rm -f *.tab.c @@ -453,26 +532,15 @@ clean-libtool: -rm -rf .libs _libs -rm -rf cJSON/.libs cJSON/_libs -ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES) - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ - mkid -fID $$unique -tags: TAGS - -TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ - $(TAGS_FILES) $(LISP) +ID: $(am__tagged_files) + $(am__define_uniq_tagged_files); mkid -fID $$unique +tags: tags-am +TAGS: tags + +tags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) set x; \ here=`pwd`; \ - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ + $(am__define_uniq_tagged_files); \ shift; \ if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \ test -n "$$unique" || unique=$$empty_fix; \ @@ -484,15 +552,11 @@ TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ $$unique; \ fi; \ fi -ctags: CTAGS -CTAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \ - $(TAGS_FILES) $(LISP) - list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \ - unique=`for i in $$list; do \ - if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \ - done | \ - $(AWK) '{ files[$$0] = 1; nonempty = 1; } \ - END { if (nonempty) { for (i in files) print i; }; }'`; \ +ctags: ctags-am + +CTAGS: ctags +ctags-am: $(TAGS_DEPENDENCIES) $(am__tagged_files) + $(am__define_uniq_tagged_files); \ test -z "$(CTAGS_ARGS)$$unique" \ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \ $$unique @@ -501,6 +565,21 @@ GTAGS: here=`$(am__cd) $(top_builddir) && pwd` \ && $(am__cd) $(top_srcdir) \ && gtags -i $(GTAGS_ARGS) "$$here" +cscopelist: cscopelist-am + +cscopelist-am: $(am__tagged_files) + list='$(am__tagged_files)'; \ + case "$(srcdir)" in \ + [\\/]* | ?:[\\/]*) sdir="$(srcdir)" ;; \ + *) sdir=$(subdir)/$(srcdir) ;; \ + esac; \ + for i in $$list; do \ + if test -f "$$i"; then \ + echo "$(subdir)/$$i"; \ + else \ + echo "$$sdir/$$i"; \ + fi; \ + done >> $(top_builddir)/cscope.files distclean-tags: -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags @@ -647,19 +726,20 @@ uninstall-am: uninstall-pkglibLTLIBRARIES .MAKE: install-am install-strip -.PHONY: CTAGS GTAGS all all-am check check-am clean clean-generic \ - clean-libtool clean-pkglibLTLIBRARIES ctags distclean \ - distclean-compile distclean-generic distclean-libtool \ - distclean-tags distdir dvi dvi-am html html-am info info-am \ - install install-am install-data install-data-am install-dvi \ - install-dvi-am install-exec install-exec-am install-html \ - install-html-am install-info install-info-am install-man \ - install-pdf install-pdf-am install-pkglibLTLIBRARIES \ - install-ps install-ps-am install-strip installcheck \ - installcheck-am installdirs maintainer-clean \ - maintainer-clean-generic mostlyclean mostlyclean-compile \ - mostlyclean-generic mostlyclean-libtool pdf pdf-am ps ps-am \ - tags uninstall uninstall-am uninstall-pkglibLTLIBRARIES +.PHONY: CTAGS GTAGS TAGS all all-am check check-am clean clean-generic \ + clean-libtool clean-pkglibLTLIBRARIES cscopelist-am ctags \ + ctags-am distclean distclean-compile distclean-generic \ + distclean-libtool distclean-tags distdir dvi dvi-am html \ + html-am info info-am install install-am install-data \ + install-data-am install-dvi install-dvi-am install-exec \ + install-exec-am install-html install-html-am install-info \ + install-info-am install-man install-pdf install-pdf-am \ + install-pkglibLTLIBRARIES install-ps install-ps-am \ + install-strip installcheck installcheck-am installdirs \ + maintainer-clean maintainer-clean-generic mostlyclean \ + mostlyclean-compile mostlyclean-generic mostlyclean-libtool \ + pdf pdf-am ps ps-am tags tags-am uninstall uninstall-am \ + uninstall-pkglibLTLIBRARIES # Tell versions [3.59,3.63) of GNU make to not export all variables. diff --git a/plugins/omelasticsearch/README b/plugins/omelasticsearch/README index 9021bc0..b8bf415 100644 --- a/plugins/omelasticsearch/README +++ b/plugins/omelasticsearch/README @@ -1,3 +1,7 @@ +How to access ElasticSearch on local machine (for testing): +=========================================================== +see: https://github.com/mobz/elasticsearch-head + How to produce an error: ======================== It's quite easy to get 400, if you put a wrong mapping to your diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 307f85d..f59ed0a 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail) typedef struct curl_slist HEADER; typedef struct _instanceData { int port; - int replyLen; int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; uchar *server; uchar *uid; uchar *pwd; @@ -80,24 +80,30 @@ typedef struct _instanceData { uchar *tplName; uchar *timeout; uchar *bulkId; - uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; - char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; sbool dynBulkId; sbool bulkmode; sbool asyncRepl; + sbool useHttps; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + int replyLen; + char *reply; + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ struct { es_str_t *data; int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; - CURL *curlHandle; /* libcurl session handle */ - HEADER *postHeader; /* json POST request info */ -} instanceData; +} wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ @@ -115,9 +121,10 @@ static struct cnfparamdescr actpdescr[] = { { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, + { "usehttps", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -127,12 +134,32 @@ static struct cnfparamblk actpblk = actpdescr }; +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData); + BEGINcreateInstance CODESTARTcreateInstance - pData->restURL = NULL; pData->fdErrFile = -1; + pthread_mutex_init(&pData->mutErrFile, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); +finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -141,16 +168,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if (pData->postHeader) { - curl_slist_free_all(pData->postHeader); - pData->postHeader = NULL; - } - if (pData->curlHandle) { - curl_easy_cleanup(pData->curlHandle); - pData->curlHandle = NULL; - } if(pData->fdErrFile != -1) close(pData->fdErrFile); + pthread_mutex_destroy(&pData->mutErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -159,11 +179,23 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); - free(pData->restURL); free(pData->errorFile); free(pData->bulkId); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->postHeader) { + curl_slist_free_all(pWrkrData->postHeader); + pWrkrData->postHeader = NULL; + } + if(pWrkrData->curlHandle) { + curl_easy_cleanup(pWrkrData->curlHandle); + pWrkrData->curlHandle = NULL; + } + free(pWrkrData->restURL); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("omelasticsearch\n"); @@ -180,6 +212,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tuse https=%d\n", pData->useHttps); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); @@ -201,7 +234,12 @@ setBaseURL(instanceData *pData, es_str_t **url) *url = es_newStr(128); snprintf(portBuf, sizeof(portBuf), "%d", pData->port); - r = es_addBuf(url, "http://", sizeof("http://")-1); + if (pData->useHttps) { + r = es_addBuf(url, "https://", sizeof("https://")-1); + } + else { + r = es_addBuf(url, "http://", sizeof("http://")-1); + } if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); if(r == 0) r = es_addChar(url, ':'); if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); @@ -211,7 +249,7 @@ setBaseURL(instanceData *pData, es_str_t **url) static inline rsRetVal -checkConn(instanceData *pData) +checkConn(wrkrInstanceData_t *pWrkrData) { es_str_t *url; CURL *curl = NULL; @@ -219,7 +257,7 @@ checkConn(instanceData *pData) char *cstr; DEFiRet; - setBaseURL(pData, &url); + setBaseURL(pWrkrData->pData, &url); curl = curl_easy_init(); if(curl == NULL) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); @@ -235,16 +273,16 @@ checkConn(instanceData *pData) curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - pData->reply = NULL; - pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } - free(pData->reply); + free(pWrkrData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -257,7 +295,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); - iRet = checkConn(pData); + iRet = checkConn(pWrkrData); ENDtryResume @@ -330,7 +368,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, static rsRetVal -setCurlURL(instanceData *pData, uchar **tpls) +setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls) { char authBuf[1024]; uchar *searchIndex; @@ -368,11 +406,11 @@ setCurlURL(instanceData *pData, uchar **tpls) if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); } - free(pData->restURL); - pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + free(pWrkrData->restURL); + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -383,8 +421,8 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); - curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: RETiRet; @@ -396,7 +434,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar **tpls) +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -411,29 +449,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); if(parent != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(bulkId != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } - ++pData->batch.nmemb; + ++pWrkrData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -446,7 +484,7 @@ finalize_it: * needs to be closed, HUP must be sent. */ static inline rsRetVal -writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) { char *rendered = NULL; cJSON *errRoot; @@ -454,6 +492,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *replyRoot = *pReplyRoot; size_t toWrite; ssize_t wrRet; + sbool bMutLocked = 0; char errStr[1024]; DEFiRet; @@ -463,6 +502,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) FINALIZE; } + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, @@ -474,7 +516,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) } } if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); - cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL)); cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); @@ -495,13 +537,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); free(rendered); RETiRet; } static inline rsRetVal -checkResultBulkmode(instanceData *pData, cJSON *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root) { int i; int numitems; @@ -515,7 +559,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root) if(items == NULL || items->type != cJSON_Array) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " "bulkmode insert does not return array, reply is: %s\n", - pData->reply); + pWrkrData->reply); ABORT_FINALIZE(RS_RET_DATAFAIL); } numitems = cJSON_GetArraySize(items); @@ -547,20 +591,20 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData, uchar *reqmsg) +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) { cJSON *root; cJSON *ok; DEFiRet; - root = cJSON_Parse(pData->reply); + root = cJSON_Parse(pWrkrData->reply); if(root == NULL) { DBGPRINTF("omelasticsearch: could not parse JSON result \n"); ABORT_FINALIZE(RS_RET_ERR); } - if(pData->bulkmode) { - iRet = checkResultBulkmode(pData, root); + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root); } else { ok = cJSON_GetObjectItem(root, "ok"); if(ok == NULL || ok->type != cJSON_True) { @@ -573,7 +617,7 @@ checkResult(instanceData *pData, uchar *reqmsg) */ if(iRet == RS_RET_DATAFAIL) { STATSCOUNTER_INC(indexESFail, mutIndexESFail); - writeDataError(pData, &root, reqmsg); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); iRet = RS_RET_OK; /* we have handled the problem! */ } @@ -588,19 +632,19 @@ finalize_it: static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; - CURL *curl = pData->curlHandle; + CURL *curl = pWrkrData->curlHandle; DEFiRet; - pData->reply = NULL; - pData->replyLen = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; - if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) - CHKiRet(setCurlURL(pData, tpls)); + if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent) + CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); @@ -619,27 +663,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg break; } - DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen); - if (pData->replyLen > 0) { - pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ } - DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); - CHKiRet(checkResult(pData, message)); + CHKiRet(checkResult(pWrkrData, message)); finalize_it: - free(pData->reply); + free(pWrkrData->reply); RETiRet; } BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); - if(!pData->bulkmode) { +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); + if(!pWrkrData->pData->bulkmode) { FINALIZE; } - es_emptyStr(pData->batch.data); - pData->batch.nmemb = 0; + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; finalize_it: ENDbeginTransaction @@ -647,14 +691,14 @@ ENDbeginTransaction BEGINdoAction CODESTARTdoAction STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); - if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString)); + if(pWrkrData->pData->bulkmode) { + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { - CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction @@ -663,13 +707,13 @@ BEGINendTransaction CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); /* End Transaction only if batch data is not empty */ - if (pData->batch.data != NULL ) { - cstr = es_str2cstr(pData->batch.data, NULL); + if (pWrkrData->batch.data != NULL ) { + cstr = es_str2cstr(pWrkrData->batch.data, NULL); dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb)); + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); } else - dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n"); + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -680,24 +724,24 @@ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { char *p = (char *)ptr; - instanceData *pData = (instanceData*) userdata; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata; char *buf; size_t newlen; - newlen = pData->replyLen + size*nmemb; - if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + newlen = pWrkrData->replyLen + size*nmemb; + if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) { DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); return 0; /* abort due to failure */ } - memcpy(buf+pData->replyLen, p, size*nmemb); - pData->replyLen = newlen; - pData->reply = buf; + memcpy(buf+pWrkrData->replyLen, p, size*nmemb); + pWrkrData->replyLen = newlen; + pWrkrData->reply = buf; return size*nmemb; } static rsRetVal -curlSetup(instanceData *pData) +curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData) { HEADER *header; CURL *handle; @@ -713,13 +757,13 @@ curlSetup(instanceData *pData) curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); curl_easy_setopt(handle, CURLOPT_POST, 1); - pData->curlHandle = handle; - pData->postHeader = header; + pWrkrData->curlHandle = handle; + pWrkrData->postHeader = header; if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL); + setCurlURL(pWrkrData, pData, NULL); } if(Debug) { @@ -746,6 +790,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchType = 0; pData->dynParent = 0; pData->asyncRepl = 0; + pData->useHttps = 0; pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; @@ -796,6 +841,8 @@ CODESTARTnewActInst pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { pData->asyncRepl = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "usehttps")) { + pData->useHttps = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { @@ -839,16 +886,6 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - if(pData->bulkmode) { - pData->batch.currTpl1 = NULL; - pData->batch.currTpl2 = NULL; - if((pData->batch.data = es_newStr(1024)) == NULL) { - DBGPRINTF("omelasticsearch: error creating batch string " - "turned off bulk mode\n"); - pData->bulkmode = 0; /* at least it works */ - } - } - iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; @@ -940,9 +977,6 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); - - CHKiRet(curlSetup(pData)); - CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -980,6 +1014,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP |