/* omrabbitmq.c * * This output plugin enables rsyslog to send messages to the RabbitMQ. * * Copyright 2012-2013 Vaclav Tomec * Copyright 2014 Rainer Gerhards * * This program is free software: you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation, either version 3 of * the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this program. If not, see * . * * Author: Vaclav Tomec * */ #include "config.h" #include "rsyslog.h" #include #include #include #include #include #include #include #include #include "conf.h" #include "syslogd-types.h" #include "srUtils.h" #include "template.h" #include "module-template.h" #include "errmsg.h" #include "cfsysline.h" #include MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP MODULE_CNFNAME("omrabbitmq") /* * internal structures */ DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) static pthread_mutex_t mutDoAct = PTHREAD_MUTEX_INITIALIZER; typedef struct _instanceData { /* here you need to define all action-specific data. A record of type * instanceData will be handed over to each instance of the action. Keep * in mind that there may be several invocations of the same type of action * inside rsyslog.conf, and this is what keeps them apart. Do NOT use * static data for this! */ amqp_connection_state_t conn; amqp_basic_properties_t props; uchar *host; int port; uchar *vhost; uchar *user; uchar *password; uchar *exchange; uchar *routing_key; uchar *tplName; } instanceData; typedef struct wrkrInstanceData { instanceData *pData; } wrkrInstanceData_t; /* tables for interfacing with the v6 config system */ /* action (instance) parameters */ static struct cnfparamdescr actpdescr[] = { { "host", eCmdHdlrGetWord, 0 }, { "port", eCmdHdlrInt, 0 }, { "virtual_host", eCmdHdlrGetWord, 0 }, { "user", eCmdHdlrGetWord, 0 }, { "password", eCmdHdlrGetWord, 0 }, { "exchange", eCmdHdlrGetWord, 0 }, { "routing_key", eCmdHdlrGetWord, 0 }, { "template", eCmdHdlrGetWord, 0 } }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, sizeof(actpdescr)/sizeof(struct cnfparamdescr), actpdescr }; /* * Report general error */ static int die_on_error(int x, char const *context) { int retVal = 0; // false if (x < 0) { char *errstr = amqp_error_string(-x); errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, errstr); free(errstr); retVal = 1; // true } return retVal; } /* * Report AMQP specific error */ static int die_on_amqp_error(amqp_rpc_reply_t x, char const *context) { int retVal = 1; // true switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: retVal = 0; // false break; case AMQP_RESPONSE_NONE: errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: missing RPC reply type!", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, amqp_error_string(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded; errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server connection error %d, message: %.*s", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server channel error %d, message: %.*s", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); break; } default: errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } return retVal; } static amqp_bytes_t cstring_bytes(const char *str) { return str ? amqp_cstring_bytes(str) : amqp_empty_bytes; } static void closeAMQPConnection(instanceData *pData) { if (pData->conn != NULL) { die_on_amqp_error(amqp_channel_close(pData->conn, 1, AMQP_REPLY_SUCCESS), "amqp_channel_close"); die_on_amqp_error(amqp_connection_close(pData->conn, AMQP_REPLY_SUCCESS), "amqp_connection_close"); die_on_error(amqp_destroy_connection(pData->conn), "amqp_destroy_connection"); pData->conn = NULL; } } /* * Initialize RabbitMQ connection */ static rsRetVal initRabbitMQ(instanceData *pData) { int sockfd; DEFiRet; DBGPRINTF("omrabbitmq: trying connect to '%s' at port %d\n", pData->host, pData->port); pData->conn = amqp_new_connection(); if (die_on_error(sockfd = amqp_open_socket((char*) pData->host, pData->port), "Opening socket")) { pData->conn = NULL; ABORT_FINALIZE(RS_RET_SUSPENDED); } amqp_set_sockfd(pData->conn, sockfd); if (die_on_amqp_error(amqp_login(pData->conn, (char*) pData->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, pData->user, pData->password), "Logging in")) { pData->conn = NULL; ABORT_FINALIZE(RS_RET_SUSPENDED); } amqp_channel_open(pData->conn, 1); if (die_on_amqp_error(amqp_get_rpc_reply(pData->conn), "Opening channel")) { pData->conn = NULL; ABORT_FINALIZE(RS_RET_SUSPENDED); } finalize_it: RETiRet; } BEGINcreateInstance CODESTARTcreateInstance ENDcreateInstance BEGINcreateWrkrInstance CODESTARTcreateWrkrInstance ENDcreateWrkrInstance BEGINisCompatibleWithFeature CODESTARTisCompatibleWithFeature /* use this to specify if select features are supported by this * plugin. If not, the framework will handle that. Currently, only * RepeatedMsgReduction ("last message repeated n times") is optional. */ if(eFeat == sFEATURERepeatedMsgReduction) iRet = RS_RET_OK; ENDisCompatibleWithFeature BEGINfreeInstance CODESTARTfreeInstance /* this is a cleanup callback. All dynamically-allocated resources * in instance data must be cleaned up here. Prime examples are * malloc()ed memory, file & database handles and the like. */ closeAMQPConnection(pData); free(pData->host); free(pData->vhost); free(pData->user); free(pData->password); free(pData->exchange); free(pData->routing_key); free(pData->tplName); ENDfreeInstance BEGINfreeWrkrInstance CODESTARTfreeWrkrInstance ENDfreeWrkrInstance BEGINdbgPrintInstInfo CODESTARTdbgPrintInstInfo /* permits to spit out some debug info */ dbgprintf("omrabbitmq\n"); dbgprintf("\thost='%s'\n", pData->host); dbgprintf("\tport=%d\n", pData->port); dbgprintf("\tvirtual_host='%s'\n", pData->vhost); dbgprintf("\tuser='%s'\n", pData->user == NULL ? (uchar*)"(not configured)" : pData->user); dbgprintf("\tpassword=(%sconfigured)\n", pData->password == NULL ? "not " : ""); dbgprintf("\texchange='%s'\n", pData->exchange); dbgprintf("\trouting_key='%s'\n", pData->routing_key); dbgprintf("\ttemplate='%s'\n", pData->tplName); ENDdbgPrintInstInfo BEGINtryResume instanceData *pData = pWrkrData->pData; CODESTARTtryResume /* this is called when an action has been suspended and the * rsyslog core tries to resume it. The action must then * retry (if possible) and report RS_RET_OK if it succeeded * or RS_RET_SUSPENDED otherwise. * Note that no data can be written in this callback, as it is * not present. Prime examples of what can be retried are * reconnects to remote hosts, reconnects to database, * opening of files and the like. * If there is no retry-type of operation, the action may * return RS_RET_OK, so that it will get called on its doAction * entry point (where it receives data), retries there, and * immediately returns RS_RET_SUSPENDED if that does not work * out. This disables some optimizations in the core's retry logic, * but is a valid and expected behaviour. Note that it is also OK * for the retry entry point to return OK but the immediately following * doAction call to fail. In real life, for example, a buggy com line * may cause such behaviour. * Note that there is no guarantee that the core will very quickly * call doAction after the retry succeeded. Today, it does, but that may * not always be the case. */ pthread_mutex_lock(&mutDoAct); if (pData->conn == NULL) { iRet = initRabbitMQ(pData); } pthread_mutex_unlock(&mutDoAct); ENDtryResume BEGINdoAction instanceData *pData = pWrkrData->pData; CODESTARTdoAction /* this is where you receive the message and need to carry out the * action. Data is provided in ppString[i] where 0 <= i <= num of strings * requested. * Return RS_RET_OK if all goes well, RS_RET_SUSPENDED if the action can * currently not complete, or an error code or RS_RET_DISABLED. The later * two should only be returned if there is no hope that the action can be * restored unless an rsyslog restart (prime example is an invalid config). * Error code or RS_RET_DISABLED permanently disables the action, up to * the next restart. */ amqp_bytes_t body_bytes; pthread_mutex_lock(&mutDoAct); if (pData->conn == NULL) { CHKiRet(initRabbitMQ(pData)); } body_bytes = amqp_cstring_bytes((char *)ppString[0]); if (die_on_error(amqp_basic_publish(pData->conn, 1, cstring_bytes((char *) pData->exchange), cstring_bytes((char *) pData->routing_key), 0, 0, &pData->props, body_bytes), "amqp_basic_publish")) { closeAMQPConnection(pData); ABORT_FINALIZE(RS_RET_SUSPENDED); } finalize_it: pthread_mutex_unlock(&mutDoAct); ENDdoAction static inline void setInstParamDefaults(instanceData *pData) { pData->host = NULL; pData->port = 5672; pData->vhost = NULL; pData->user = NULL; pData->password = NULL; pData->exchange = NULL; pData->routing_key = NULL; pData->tplName = NULL; } BEGINnewActInst struct cnfparamvals *pvals; int i; CODESTARTnewActInst if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) { ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS); } CHKiRet(createInstance(&pData)); setInstParamDefaults(pData); CODE_STD_STRING_REQUESTparseSelectorAct(1) for(i = 0 ; i < actpblk.nParams ; ++i) { if (!pvals[i].bUsed) continue; if (!strcmp(actpblk.descr[i].name, "host")) { pData->host = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "port")) { pData->port = (int) pvals[i].val.d.n; } else if (!strcmp(actpblk.descr[i].name, "virtual_host")) { pData->vhost = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "user")) { pData->user = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "password")) { pData->password = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "exchange")) { pData->exchange = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "routing_key")) { pData->routing_key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else if (!strcmp(actpblk.descr[i].name, "template")) { pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omrabbitmq: program error, non-handled param '%s'\n", actpblk.descr[i].name); } } if (pData->host == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter host must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } if (pData->vhost == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter virtual_host must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } if (pData->user == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter user must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } if (pData->password == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter password must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } if (pData->exchange == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter exchange must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } if (pData->routing_key == NULL) { errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter routing_key must be specified"); ABORT_FINALIZE(RS_RET_INVALID_PARAMS); } // RabbitMQ properties initialization memset(&pData->props, 0, sizeof pData->props); pData->props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG; pData->props.delivery_mode = 2; /* persistent delivery mode */ pData->props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; pData->props.content_type = amqp_cstring_bytes("application/json"); CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? " StdJSONFmt" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); ENDnewActInst BEGINparseSelectorAct CODESTARTparseSelectorAct CODE_STD_STRING_REQUESTparseSelectorAct(1) if(!strncmp((char*) p, ":omrabbitmq:", sizeof(":omrabbitmq:") - 1)) { errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED, "omrabbitmq supports only v6 config format, use: " "action(type=\"omrabbitmq\" host=...)"); } ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED); CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct BEGINmodExit CODESTARTmodExit objRelease(errmsg, CORE_COMPONENT); ENDmodExit BEGINqueryEtryPt CODESTARTqueryEtryPt CODEqueryEtryPt_STD_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_STD_OMOD8_QUERIES ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); ENDmodInit