summaryrefslogtreecommitdiff
path: root/plugins/omelasticsearch/omelasticsearch.c
diff options
context:
space:
mode:
authorMichael Biebl <biebl@debian.org>2014-03-11 19:04:23 +0100
committerMichael Biebl <biebl@debian.org>2014-03-11 19:04:23 +0100
commit5fccf771f37973a2cea543fbdf1ed94ed9706faf (patch)
tree80b547300bfb6f1c8ffae16a8af9dea03c3fb75d /plugins/omelasticsearch/omelasticsearch.c
parent0c656246c6818b9fd0426411ed1cf926fbbc907d (diff)
downloadrsyslog-5fccf771f37973a2cea543fbdf1ed94ed9706faf.tar.gz
Imported Upstream version 7.4.8upstream/7.4.8
Diffstat (limited to 'plugins/omelasticsearch/omelasticsearch.c')
-rw-r--r--plugins/omelasticsearch/omelasticsearch.c45
1 files changed, 28 insertions, 17 deletions
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index aea8e32..b82968d 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -4,7 +4,7 @@
* NOTE: read comments in module-template.h for more specifics!
*
* Copyright 2011 Nathan Scott.
- * Copyright 2009-2012 Rainer Gerhards and Adiscon GmbH.
+ * Copyright 2009-2013 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
@@ -58,10 +58,10 @@ DEFobjCurrIf(errmsg)
DEFobjCurrIf(statsobj)
statsobj_t *indexStats;
-STATSCOUNTER_DEF(indexConFail, mutIndexConFail)
STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
-STATSCOUNTER_DEF(indexFailed, mutIndexFailed)
-STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
+STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
+STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
+STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
/* REST API for elasticsearch hits this URL:
* http://<hostName>:<restPort>/<searchIndex>/<searchType>
@@ -91,6 +91,7 @@ typedef struct _instanceData {
sbool asyncRepl;
struct {
es_str_t *data;
+ int nmemb; /* number of messages in batch (for statistics counting) */
uchar *currTpl1;
uchar *currTpl2;
} batch;
@@ -432,6 +433,7 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
}
+ ++pData->batch.nmemb;
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
@@ -577,12 +579,15 @@ checkResult(instanceData *pData, uchar *reqmsg)
finalize_it:
if(root != NULL)
cJSON_Delete(root);
+ if(iRet != RS_RET_OK) {
+ STATSCOUNTER_INC(indexESFail, mutIndexESFail);
+ }
RETiRet;
}
static rsRetVal
-curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
+curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs)
{
CURLcode code;
CURL *curl = pData->curlHandle;
@@ -603,13 +608,13 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls)
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_CONNECT:
case CURLE_WRITE_ERROR:
- STATSCOUNTER_INC(indexConFail, mutIndexConFail);
+ STATSCOUNTER_INC(indexHTTPReqFail, mutHTTPReqFail);
+ indexHTTPFail += nmsgs;
DBGPRINTF("omelasticsearch: we are suspending ourselfs due "
"to failure %lld of curl_easy_perform()\n",
(long long) code);
ABORT_FINALIZE(RS_RET_SUSPENDED);
default:
- STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
break;
}
@@ -633,17 +638,19 @@ dbgprintf("omelasticsearch: beginTransaction\n");
}
es_emptyStr(pData->batch.data);
+ pData->batch.nmemb = 0;
finalize_it:
ENDbeginTransaction
BEGINdoAction
CODESTARTdoAction
+ STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString));
} else {
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
- ppString));
+ ppString, 1));
}
finalize_it:
dbgprintf("omelasticsearch: result doAction: %d (bulkmode %d)\n", iRet, pData->bulkmode);
@@ -658,7 +665,7 @@ dbgprintf("omelasticsearch: endTransaction init\n");
if (pData->batch.data != NULL ) {
cstr = es_str2cstr(pData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
- CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL));
+ CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb));
}
else
dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n");
@@ -993,15 +1000,19 @@ CODEmodInit_QueryRegCFSLineHdlr
/* support statistics gathering */
CHKiRet(statsobj.Construct(&indexStats));
- CHKiRet(statsobj.SetName(indexStats, (uchar *)"elasticsearch"));
- CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"connfail",
- ctrType_IntCtr, &indexConFail));
- CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submits",
+ CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch"));
+ STATSCOUNTER_INIT(indexSubmit, mutCtrIndexSubmit);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted",
ctrType_IntCtr, &indexSubmit));
- CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed",
- ctrType_IntCtr, &indexFailed));
- CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"success",
- ctrType_IntCtr, &indexSuccess));
+ STATSCOUNTER_INIT(indexHTTPFail, mutCtrIndexHTTPFail);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http",
+ ctrType_IntCtr, &indexHTTPFail));
+ STATSCOUNTER_INIT(indexHTTPReqFail, mutCtrIndexHTTPReqFail);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests",
+ ctrType_IntCtr, &indexHTTPReqFail));
+ STATSCOUNTER_INIT(indexESFail, mutCtrIndexESFail);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
+ ctrType_IntCtr, &indexESFail));
CHKiRet(statsobj.ConstructFinalize(indexStats));
ENDmodInit