diff options
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r-- | plugins/omelasticsearch/omelasticsearch.c | 313 |
1 files changed, 187 insertions, 126 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index 33e58c1..68fb3c1 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -4,7 +4,7 @@ * NOTE: read comments in module-template.h for more specifics! * * Copyright 2011 Nathan Scott. - * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH. + * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH. * * This file is part of rsyslog. * @@ -58,10 +58,10 @@ DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) statsobj_t *indexStats; -STATSCOUNTER_DEF(indexConFail, mutIndexConFail) STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) -STATSCOUNTER_DEF(indexFailed, mutIndexFailed) -STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) +STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail) +STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail) +STATSCOUNTER_DEF(indexESFail, mutIndexESFail) /* REST API for elasticsearch hits this URL: * http://<hostName>:<restPort>/<searchIndex>/<searchType> @@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) typedef struct curl_slist HEADER; typedef struct _instanceData { int port; - int replyLen; int fdErrFile; /* error file fd or -1 if not open */ + pthread_mutex_t mutErrFile; uchar *server; uchar *uid; uchar *pwd; @@ -80,23 +80,30 @@ typedef struct _instanceData { uchar *tplName; uchar *timeout; uchar *bulkId; - uchar *restURL; /* last used URL for error reporting */ uchar *errorFile; - char *reply; sbool dynSrchIdx; sbool dynSrchType; sbool dynParent; sbool dynBulkId; sbool bulkmode; sbool asyncRepl; + sbool useHttps; +} instanceData; + +typedef struct wrkrInstanceData { + instanceData *pData; + int replyLen; + char *reply; + CURL *curlHandle; /* libcurl session handle */ + HEADER *postHeader; /* json POST request info */ + uchar *restURL; /* last used URL for error reporting */ struct { es_str_t *data; + int nmemb; /* number of messages in batch (for statistics counting) */ uchar *currTpl1; uchar *currTpl2; } batch; - CURL *curlHandle; /* libcurl session handle */ - HEADER *postHeader; /* json POST request info */ -} instanceData; +} wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ @@ -114,9 +121,10 @@ static struct cnfparamdescr actpdescr[] = { { "dynparent", eCmdHdlrBinary, 0 }, { "bulkmode", eCmdHdlrBinary, 0 }, { "asyncrepl", eCmdHdlrBinary, 0 }, + { "usehttps", eCmdHdlrBinary, 0 }, { "timeout", eCmdHdlrGetWord, 0 }, { "errorfile", eCmdHdlrGetWord, 0 }, - { "template", eCmdHdlrGetWord, 1 }, + { "template", eCmdHdlrGetWord, 0 }, { "dynbulkid", eCmdHdlrBinary, 0 }, { "bulkid", eCmdHdlrGetWord, 0 }, }; @@ -126,12 +134,32 @@ static struct cnfparamblk actpblk = actpdescr }; +static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData); + BEGINcreateInstance CODESTARTcreateInstance - pData->restURL = NULL; pData->fdErrFile = -1; + pthread_mutex_init(&pData->mutErrFile, NULL); ENDcreateInstance +BEGINcreateWrkrInstance +CODESTARTcreateWrkrInstance +dbgprintf("omelasticsearch: createWrkrInstance\n"); + pWrkrData->restURL = NULL; + if(pData->bulkmode) { + pWrkrData->batch.currTpl1 = NULL; + pWrkrData->batch.currTpl2 = NULL; + if((pWrkrData->batch.data = es_newStr(1024)) == NULL) { + DBGPRINTF("omelasticsearch: error creating batch string " + "turned off bulk mode\n"); + pData->bulkmode = 0; /* at least it works */ + } + } + CHKiRet(curlSetup(pWrkrData, pWrkrData->pData)); +finalize_it: +dbgprintf("DDDD: createWrkrInstance,pData %p/%p, pWrkrData %p\n", pData, pWrkrData->pData, pWrkrData); +ENDcreateWrkrInstance + BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature if(eFeat == sFEATURERepeatedMsgReduction) @@ -140,16 +168,9 @@ ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance - if (pData->postHeader) { - curl_slist_free_all(pData->postHeader); - pData->postHeader = NULL; - } - if (pData->curlHandle) { - curl_easy_cleanup(pData->curlHandle); - pData->curlHandle = NULL; - } if(pData->fdErrFile != -1) close(pData->fdErrFile); + pthread_mutex_destroy(&pData->mutErrFile); free(pData->server); free(pData->uid); free(pData->pwd); @@ -158,11 +179,23 @@ CODESTARTfreeInstance free(pData->parent); free(pData->tplName); free(pData->timeout); - free(pData->restURL); free(pData->errorFile); free(pData->bulkId); ENDfreeInstance +BEGINfreeWrkrInstance +CODESTARTfreeWrkrInstance + if(pWrkrData->postHeader) { + curl_slist_free_all(pWrkrData->postHeader); + pWrkrData->postHeader = NULL; + } + if(pWrkrData->curlHandle) { + curl_easy_cleanup(pWrkrData->curlHandle); + pWrkrData->curlHandle = NULL; + } + free(pWrkrData->restURL); +ENDfreeWrkrInstance + BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo dbgprintf("omelasticsearch\n"); @@ -179,6 +212,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType); dbgprintf("\tdynamic parent=%d\n", pData->dynParent); dbgprintf("\tasync replication=%d\n", pData->asyncRepl); + dbgprintf("\tuse https=%d\n", pData->useHttps); dbgprintf("\tbulkmode=%d\n", pData->bulkmode); dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ? (uchar*)"(not configured)" : pData->errorFile); @@ -200,17 +234,22 @@ setBaseURL(instanceData *pData, es_str_t **url) *url = es_newStr(128); snprintf(portBuf, sizeof(portBuf), "%d", pData->port); - r = es_addBuf(url, "http://", sizeof("http://")-1); + if (pData->useHttps) { + r = es_addBuf(url, "https://", sizeof("https://")-1); + } + else { + r = es_addBuf(url, "http://", sizeof("http://")-1); + } if(r == 0) r = es_addBuf(url, (char*)pData->server, strlen((char*)pData->server)); if(r == 0) r = es_addChar(url, ':'); if(r == 0) r = es_addBuf(url, portBuf, strlen(portBuf)); - if(r == 0) r = es_addChar(url, '/'); + if(r == 0) es_addChar(url, '/'); RETiRet; } static inline rsRetVal -checkConn(instanceData *pData) +checkConn(wrkrInstanceData_t *pWrkrData) { es_str_t *url; CURL *curl = NULL; @@ -218,26 +257,32 @@ checkConn(instanceData *pData) char *cstr; DEFiRet; - setBaseURL(pData, &url); + setBaseURL(pWrkrData->pData, &url); curl = curl_easy_init(); if(curl == NULL) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n"); ABORT_FINALIZE(RS_RET_SUSPENDED); } + /* Bodypart of request not needed, so set curl opt to nobody and httpget, otherwise lib-curl could sigsegv */ + curl_easy_setopt(curl, CURLOPT_HTTPGET, TRUE); + curl_easy_setopt(curl, CURLOPT_NOBODY, TRUE); + /* Only enable for debugging + curl_easy_setopt(curl, CURLOPT_VERBOSE, TRUE); */ + cstr = es_str2cstr(url, NULL); curl_easy_setopt(curl, CURLOPT_URL, cstr); free(cstr); - pData->reply = NULL; - pData->replyLen = 0; - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); res = curl_easy_perform(curl); if(res != CURLE_OK) { DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() " "failed: %s\n", curl_easy_strerror(res)); ABORT_FINALIZE(RS_RET_SUSPENDED); } - free(pData->reply); + free(pWrkrData->reply); DBGPRINTF("omelasticsearch: checkConn() completed with success\n"); finalize_it: @@ -250,7 +295,7 @@ finalize_it: BEGINtryResume CODESTARTtryResume DBGPRINTF("omelasticsearch: tryResume called\n"); - iRet = checkConn(pData); + iRet = checkConn(pWrkrData); ENDtryResume @@ -323,7 +368,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls, static rsRetVal -setCurlURL(instanceData *pData, uchar **tpls) +setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls) { char authBuf[1024]; uchar *searchIndex; @@ -358,14 +403,14 @@ setCurlURL(instanceData *pData, uchar **tpls) } if(parent != NULL) { if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1); - if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent)); + if(r == 0) es_addBuf(&url, (char*)parent, ustrlen(parent)); } - free(pData->restURL); - pData->restURL = (uchar*)es_str2cstr(url, NULL); - curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL); + free(pWrkrData->restURL); + pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL); es_deleteStr(url); - DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL); + DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL); if(pData->uid != NULL) { rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid, @@ -376,8 +421,8 @@ setCurlURL(instanceData *pData, uchar **tpls) rLocal); ABORT_FINALIZE(RS_RET_ERR); } - curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf); - curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf); + curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY); } finalize_it: RETiRet; @@ -389,7 +434,7 @@ finalize_it: * index changes. */ static rsRetVal -buildBatch(instanceData *pData, uchar *message, uchar **tpls) +buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { int length = strlen((char *)message); int r; @@ -404,28 +449,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls) # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" - getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex, + getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); - if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType, + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType, ustrlen(searchType)); if(parent != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent)); } if(bulkId != NULL) { - if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId)); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId)); } - if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1); - if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length); - if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length); + if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1); if(r != 0) { DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r); ABORT_FINALIZE(RS_RET_ERR); } + ++pWrkrData->batch.nmemb; iRet = RS_RET_DEFER_COMMIT; finalize_it: @@ -438,7 +484,7 @@ finalize_it: * needs to be closed, HUP must be sent. */ static inline rsRetVal -writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) +writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) { char *rendered = NULL; cJSON *errRoot; @@ -446,6 +492,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) cJSON *replyRoot = *pReplyRoot; size_t toWrite; ssize_t wrRet; + sbool bMutLocked = 0; char errStr[1024]; DEFiRet; @@ -455,6 +502,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) FINALIZE; } + pthread_mutex_lock(&pData->mutErrFile); + bMutLocked = 1; + if(pData->fdErrFile == -1) { pData->fdErrFile = open((char*)pData->errorFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, @@ -466,7 +516,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) } } if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); - cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL)); + cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL)); cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg)); if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR); @@ -483,19 +533,19 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg) DBGPRINTF("omelasticsearch: error %d writing error file, write returns %lld\n", errno, (long long) wrRet); } - free(rendered); cJSON_Delete(errRoot); *pReplyRoot = NULL; /* tell caller not to delete once again! */ finalize_it: - if(rendered != NULL) - free(rendered); + if(bMutLocked) + pthread_mutex_unlock(&pData->mutErrFile); + free(rendered); RETiRet; } static inline rsRetVal -checkResultBulkmode(instanceData *pData, cJSON *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root) { int i; int numitems; @@ -509,7 +559,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root) if(items == NULL || items->type != cJSON_Array) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " "bulkmode insert does not return array, reply is: %s\n", - pData->reply); + pWrkrData->reply); ABORT_FINALIZE(RS_RET_DATAFAIL); } numitems = cJSON_GetArraySize(items); @@ -527,10 +577,10 @@ DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); "cannot obtain 'create' item for #%d\n", i); ABORT_FINALIZE(RS_RET_DATAFAIL); } - ok = cJSON_GetObjectItem(create, "ok"); - if(ok == NULL || ok->type != cJSON_True) { + ok = cJSON_GetObjectItem(create, "status"); + if(ok == NULL || ok->type != cJSON_Number || ok->valueint < 0 || ok->valueint > 299) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: " - "item %d, prop ok (%p) not ok\n", i, ok); + "item %d, status is %d\n", i, ok->valueint); ABORT_FINALIZE(RS_RET_DATAFAIL); } } @@ -541,23 +591,25 @@ finalize_it: static inline rsRetVal -checkResult(instanceData *pData, uchar *reqmsg) +checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) { cJSON *root; - cJSON *ok; + cJSON *status; DEFiRet; - root = cJSON_Parse(pData->reply); + root = cJSON_Parse(pWrkrData->reply); if(root == NULL) { DBGPRINTF("omelasticsearch: could not parse JSON result \n"); ABORT_FINALIZE(RS_RET_ERR); } - if(pData->bulkmode) { - iRet = checkResultBulkmode(pData, root); + if(pWrkrData->pData->bulkmode) { + iRet = checkResultBulkmode(pWrkrData, root); } else { - ok = cJSON_GetObjectItem(root, "ok"); - if(ok == NULL || ok->type != cJSON_True) { + status = cJSON_GetObjectItem(root, "status"); + /* as far as we know, no "status" means all went well */ + if(status != NULL && + (status->type == cJSON_Number || status->valueint >= 0 || status->valueint <= 299)) { iRet = RS_RET_DATAFAIL; } } @@ -566,31 +618,35 @@ checkResult(instanceData *pData, uchar *reqmsg) * these in any case. */ if(iRet == RS_RET_DATAFAIL) { - writeDataError(pData, &root, reqmsg); + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg); iRet = RS_RET_OK; /* we have handled the problem! */ } finalize_it: if(root != NULL) cJSON_Delete(root); + if(iRet != RS_RET_OK) { + STATSCOUNTER_INC(indexESFail, mutIndexESFail); + } RETiRet; } static rsRetVal -curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) +curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs) { CURLcode code; - CURL *curl = pData->curlHandle; + CURL *curl = pWrkrData->curlHandle; DEFiRet; - pData->reply = NULL; - pData->replyLen = 0; + pWrkrData->reply = NULL; + pWrkrData->replyLen = 0; - if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent) - CHKiRet(setCurlURL(pData, tpls)); + if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent) + CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls)); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message); curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen); code = curl_easy_perform(curl); @@ -599,57 +655,67 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls) case CURLE_COULDNT_RESOLVE_PROXY: case CURLE_COULDNT_CONNECT: case CURLE_WRITE_ERROR: - STATSCOUNTER_INC(indexConFail, mutIndexConFail); + STATSCOUNTER_INC(indexHTTPReqFail, mutIndexHTTPReqFail); + indexHTTPFail += nmsgs; DBGPRINTF("omelasticsearch: we are suspending ourselfs due " "to failure %lld of curl_easy_perform()\n", (long long) code); ABORT_FINALIZE(RS_RET_SUSPENDED); default: - STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); break; } - pData->reply[pData->replyLen] = '\0'; /* byte has been reserved in malloc */ - DBGPRINTF("omelasticsearch: es reply: '%s'\n", pData->reply); + DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen); + if(pWrkrData->replyLen > 0) { + pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */ + } + DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply); - CHKiRet(checkResult(pData, message)); + CHKiRet(checkResult(pWrkrData, message)); finalize_it: - free(pData->reply); + free(pWrkrData->reply); RETiRet; } BEGINbeginTransaction CODESTARTbeginTransaction -dbgprintf("omelasticsearch: beginTransaction\n"); - if(!pData->bulkmode) { +dbgprintf("omelasticsearch: beginTransaction, pWrkrData %p, pData %p\n", pWrkrData, pWrkrData->pData); + if(!pWrkrData->pData->bulkmode) { FINALIZE; } - es_emptyStr(pData->batch.data); + es_emptyStr(pWrkrData->batch.data); + pWrkrData->batch.nmemb = 0; finalize_it: ENDbeginTransaction BEGINdoAction CODESTARTdoAction - if(pData->bulkmode) { - CHKiRet(buildBatch(pData, ppString[0], ppString)); + STATSCOUNTER_INC(indexSubmit, mutIndexSubmit); + if(pWrkrData->pData->bulkmode) { + CHKiRet(buildBatch(pWrkrData, ppString[0], ppString)); } else { - CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]), - ppString)); + CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]), + ppString, 1)); } finalize_it: -dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode); +dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pWrkrData->pData->bulkmode); ENDdoAction BEGINendTransaction - char *cstr; + char *cstr = NULL; CODESTARTendTransaction dbgprintf("omelasticsearch: endTransaction init\n"); - cstr = es_str2cstr(pData->batch.data, NULL); - dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); - CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL)); + /* End Transaction only if batch data is not empty */ + if (pWrkrData->batch.data != NULL ) { + cstr = es_str2cstr(pWrkrData->batch.data, NULL); + dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr); + CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb)); + } + else + dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n"); finalize_it: free(cstr); dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet); @@ -660,24 +726,24 @@ size_t curlResult(void *ptr, size_t size, size_t nmemb, void *userdata) { char *p = (char *)ptr; - instanceData *pData = (instanceData*) userdata; + wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata; char *buf; size_t newlen; - newlen = pData->replyLen + size*nmemb; - if((buf = realloc(pData->reply, newlen + 1)) == NULL) { + newlen = pWrkrData->replyLen + size*nmemb; + if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) { DBGPRINTF("omelasticsearch: realloc failed in curlResult\n"); return 0; /* abort due to failure */ } - memcpy(buf+pData->replyLen, p, size*nmemb); - pData->replyLen = newlen; - pData->reply = buf; + memcpy(buf+pWrkrData->replyLen, p, size*nmemb); + pWrkrData->replyLen = newlen; + pWrkrData->reply = buf; return size*nmemb; } static rsRetVal -curlSetup(instanceData *pData) +curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData) { HEADER *header; CURL *handle; @@ -693,13 +759,13 @@ curlSetup(instanceData *pData) curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult); curl_easy_setopt(handle, CURLOPT_POST, 1); - pData->curlHandle = handle; - pData->postHeader = header; + pWrkrData->curlHandle = handle; + pWrkrData->postHeader = header; if( pData->bulkmode || (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) { /* in this case, we know no tpls are involved in the request-->NULL OK! */ - setCurlURL(pData, NULL); + setCurlURL(pWrkrData, pData, NULL); } if(Debug) { @@ -726,6 +792,7 @@ setInstParamDefaults(instanceData *pData) pData->dynSrchType = 0; pData->dynParent = 0; pData->asyncRepl = 0; + pData->useHttps = 0; pData->bulkmode = 0; pData->tplName = NULL; pData->errorFile = NULL; @@ -776,6 +843,8 @@ CODESTARTnewActInst pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "asyncrepl")) { pData->asyncRepl = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "usehttps")) { + pData->useHttps = pvals[i].val.d.n; } else if(!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) { @@ -819,16 +888,6 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - if(pData->bulkmode) { - pData->batch.currTpl1 = NULL; - pData->batch.currTpl2 = NULL; - if((pData->batch.data = es_newStr(1024)) == NULL) { - DBGPRINTF("omelasticsearch: error creating batch string " - "turned off bulk mode\n"); - pData->bulkmode = 0; /* at least it works */ - } - } - iNumTpls = 1; if(pData->dynSrchIdx) ++iNumTpls; if(pData->dynSrchType) ++iNumTpls; @@ -920,9 +979,6 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); - - CHKiRet(curlSetup(pData)); - CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst @@ -960,6 +1016,7 @@ ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES +CODEqueryEtryPt_STD_OMOD8_QUERIES CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP @@ -981,15 +1038,19 @@ CODEmodInit_QueryRegCFSLineHdlr /* support statistics gathering */ CHKiRet(statsobj.Construct(&indexStats)); - CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch")); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail", - ctrType_IntCtr, &indexConFail)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits", - ctrType_IntCtr, &indexSubmit)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed", - ctrType_IntCtr, &indexFailed)); - CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success", - ctrType_IntCtr, &indexSuccess)); + CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch")); + STATSCOUNTER_INIT(indexSubmit, mutIndexSubmit); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSubmit)); + STATSCOUNTER_INIT(indexHTTPFail, mutIndexHTTPFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPFail)); + STATSCOUNTER_INIT(indexHTTPReqFail, mutIndexHTTPReqFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPReqFail)); + STATSCOUNTER_INIT(indexESFail, mutIndexESFail); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail)); CHKiRet(statsobj.ConstructFinalize(indexStats)); ENDmodInit |