summaryrefslogtreecommitdiff
path: root/plugins/omrabbitmq/omrabbitmq.c
blob: 7ea7793da2a59228aca963aeaad3a8906a9c042c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
/* omrabbitmq.c
 *
 * This output plugin enables rsyslog to send messages to the RabbitMQ.
 *
 * Copyright 2012-2013 Vaclav Tomec
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program.  If not, see
 * <http://www.gnu.org/licenses/>.
 * 
 * Author: Vaclav Tomec
 * <vaclav.tomec@gmail.com>
 */
#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"

#include <amqp.h>

MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omrabbitmq")


/*
 * internal structures
 */
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)


typedef struct _instanceData {
	/* here you need to define all action-specific data. A record of type 
	 * instanceData will be handed over to each instance of the action. Keep
	 * in mind that there may be several invocations of the same type of action
	 * inside rsyslog.conf, and this is what keeps them apart. Do NOT use
	 * static data for this!
	 */
	amqp_connection_state_t conn;
	amqp_basic_properties_t props;
	uchar *host;
	int port;
	uchar *vhost;
	uchar *user;
	uchar *password;
	uchar *exchange;
	uchar *routing_key;
	uchar *tplName;
} instanceData;


/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
	{ "host", eCmdHdlrGetWord, 0 },
	{ "port", eCmdHdlrInt, 0 },
	{ "virtual_host", eCmdHdlrGetWord, 0 },
	{ "user", eCmdHdlrGetWord, 0 },
	{ "password", eCmdHdlrGetWord, 0 },
	{ "exchange", eCmdHdlrGetWord, 0 },
	{ "routing_key", eCmdHdlrGetWord, 0 },
	{ "template", eCmdHdlrGetWord, 0 }
};
static struct cnfparamblk actpblk =
	{
		CNFPARAMBLK_VERSION,
		sizeof(actpdescr)/sizeof(struct cnfparamdescr),
		actpdescr
	};


/*
 * Report general error
 */
static int
die_on_error(int x, char const *context)
{
	int retVal = 0; // false

	if (x < 0) {
		char *errstr = amqp_error_string(-x);
		errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, errstr);
		free(errstr);
		retVal = 1; // true
	}

	return retVal;
}


/*
 * Report AMQP specific error
 */
static int
die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
{
	int retVal = 1; // true

	switch (x.reply_type) {
	case AMQP_RESPONSE_NORMAL:
		retVal = 0; // false
		break;

	case AMQP_RESPONSE_NONE:
		errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: missing RPC reply type!", context);
		break;

	case AMQP_RESPONSE_LIBRARY_EXCEPTION:
		errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: %s", context, amqp_error_string(x.library_error));
		break;

	case AMQP_RESPONSE_SERVER_EXCEPTION:
		switch (x.reply.id) {
		case AMQP_CONNECTION_CLOSE_METHOD: {
			amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
			errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server connection error %d, message: %.*s",
				context,
				m->reply_code,
				(int) m->reply_text.len, (char *) m->reply_text.bytes);
			break;
			}
		case AMQP_CHANNEL_CLOSE_METHOD: {
			amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
			errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: server channel error %d, message: %.*s",
				context,
				m->reply_code,
				(int) m->reply_text.len, (char *) m->reply_text.bytes);
			break;
			}
		default:
			errmsg.LogError(0, RS_RET_ERR, "omrabbitmq: %s: unknown server error, method id 0x%08X\n", context, x.reply.id);
			break;
		}
		break;

	}

	return retVal;
}


static amqp_bytes_t
cstring_bytes(const char *str)
{
	return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
}


static void
closeAMQPConnection(instanceData *pData)
{
	if (pData->conn != NULL) {
		die_on_amqp_error(amqp_channel_close(pData->conn, 1, AMQP_REPLY_SUCCESS), "amqp_channel_close");
		die_on_amqp_error(amqp_connection_close(pData->conn, AMQP_REPLY_SUCCESS), "amqp_connection_close");
		die_on_error(amqp_destroy_connection(pData->conn), "amqp_destroy_connection");

		pData->conn = NULL;
	}
}


/*
 * Initialize RabbitMQ connection
 */
static rsRetVal
initRabbitMQ(instanceData *pData)
{
	int sockfd;
	DEFiRet;

	DBGPRINTF("omrabbitmq: trying connect to '%s' at port %d\n", pData->host, pData->port);
        
	pData->conn = amqp_new_connection();

	if (die_on_error(sockfd = amqp_open_socket((char*) pData->host, pData->port), "Opening socket")) {
		pData->conn = NULL;
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	}

	amqp_set_sockfd(pData->conn, sockfd);

	if (die_on_amqp_error(amqp_login(pData->conn, (char*) pData->vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, pData->user, pData->password),
		"Logging in")) {
		pData->conn = NULL;
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	}

	amqp_channel_open(pData->conn, 1);

	if (die_on_amqp_error(amqp_get_rpc_reply(pData->conn), "Opening channel")) {
		pData->conn = NULL;
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	}

finalize_it:
	RETiRet;
}


BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance


BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
	/* use this to specify if select features are supported by this
	 * plugin. If not, the framework will handle that. Currently, only
	 * RepeatedMsgReduction ("last message repeated n times") is optional.
	 */
	if(eFeat == sFEATURERepeatedMsgReduction)
		iRet = RS_RET_OK;
ENDisCompatibleWithFeature


BEGINfreeInstance
CODESTARTfreeInstance
	/* this is a cleanup callback. All dynamically-allocated resources
	 * in instance data must be cleaned up here. Prime examples are
	 * malloc()ed memory, file & database handles and the like.
	 */
	closeAMQPConnection(pData);
	free(pData->host);
	free(pData->vhost);
	free(pData->user);
	free(pData->password);
	free(pData->exchange);
	free(pData->routing_key);
	free(pData->tplName);
ENDfreeInstance


BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
	/* permits to spit out some debug info */
	dbgprintf("omrabbitmq\n");
	dbgprintf("\thost='%s'\n", pData->host);
	dbgprintf("\tport=%d\n", pData->port);
	dbgprintf("\tvirtual_host='%s'\n", pData->vhost);
	dbgprintf("\tuser='%s'\n", pData->user == NULL ? (uchar*)"(not configured)" : pData->user);
	dbgprintf("\tpassword=(%sconfigured)\n", pData->password == NULL ? "not " : "");
	dbgprintf("\texchange='%s'\n", pData->exchange);
	dbgprintf("\trouting_key='%s'\n", pData->routing_key);
	dbgprintf("\ttemplate='%s'\n", pData->tplName);
ENDdbgPrintInstInfo


BEGINtryResume
CODESTARTtryResume
	/* this is called when an action has been suspended and the
	 * rsyslog core tries to resume it. The action must then
	 * retry (if possible) and report RS_RET_OK if it succeeded
	 * or RS_RET_SUSPENDED otherwise.
	 * Note that no data can be written in this callback, as it is
	 * not present. Prime examples of what can be retried are
	 * reconnects to remote hosts, reconnects to database,
	 * opening of files and the like.
	 * If there is no retry-type of operation, the action may
	 * return RS_RET_OK, so that it will get called on its doAction
	 * entry point (where it receives data), retries there, and
	 * immediately returns RS_RET_SUSPENDED if that does not work
	 * out. This disables some optimizations in the core's retry logic,
	 * but is a valid and expected behaviour. Note that it is also OK
	 * for the retry entry point to return OK but the immediately following
	 * doAction call to fail. In real life, for example, a buggy com line
	 * may cause such behaviour.
	 * Note that there is no guarantee that the core will very quickly
	 * call doAction after the retry succeeded. Today, it does, but that may
	 * not always be the case.
	 */

	if (pData->conn == NULL) {
		iRet = initRabbitMQ(pData);
	}

ENDtryResume


BEGINdoAction
CODESTARTdoAction
	/* this is where you receive the message and need to carry out the
	 * action. Data is provided in ppString[i] where 0 <= i <= num of strings
	 * requested.
	 * Return RS_RET_OK if all goes well, RS_RET_SUSPENDED if the action can
	 * currently not complete, or an error code or RS_RET_DISABLED. The later
	 * two should only be returned if there is no hope that the action can be
	 * restored unless an rsyslog restart (prime example is an invalid config).
	 * Error code or RS_RET_DISABLED permanently disables the action, up to
	 * the next restart.
	 */

	amqp_bytes_t body_bytes;

	if (pData->conn == NULL) {
		CHKiRet(initRabbitMQ(pData));
	}

	body_bytes = amqp_cstring_bytes((char *)ppString[0]);

	if (die_on_error(amqp_basic_publish(pData->conn, 1,
			cstring_bytes((char *) pData->exchange),
			cstring_bytes((char *) pData->routing_key),
			0, 0, &pData->props, body_bytes), "amqp_basic_publish")) {
		closeAMQPConnection(pData);
		ABORT_FINALIZE(RS_RET_SUSPENDED);
	}

finalize_it:

ENDdoAction


static inline void
setInstParamDefaults(instanceData *pData)
{
	pData->host = NULL;
	pData->port = 5672;
	pData->vhost = NULL;
	pData->user = NULL;
	pData->password = NULL;
	pData->exchange = NULL;
	pData->routing_key = NULL;
	pData->tplName = NULL;
}


BEGINnewActInst
	struct cnfparamvals *pvals;
	int i;
CODESTARTnewActInst

	if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
	}

	CHKiRet(createInstance(&pData));
	setInstParamDefaults(pData);

	CODE_STD_STRING_REQUESTparseSelectorAct(1)

	for(i = 0 ; i < actpblk.nParams ; ++i) {
		if (!pvals[i].bUsed)
			continue;
		if (!strcmp(actpblk.descr[i].name, "host")) {
			pData->host = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "port")) {
			pData->port = (int) pvals[i].val.d.n;
		} else if (!strcmp(actpblk.descr[i].name, "virtual_host")) {
			pData->vhost = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "user")) {
			pData->user = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "password")) {
			pData->password = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "exchange")) {
			pData->exchange = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "routing_key")) {
			pData->routing_key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else if (!strcmp(actpblk.descr[i].name, "template")) {
			pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
		} else {
			dbgprintf("omrabbitmq: program error, non-handled param '%s'\n", actpblk.descr[i].name);
		}
	}

	if (pData->host == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter host must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}

	if (pData->vhost == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter virtual_host must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}

	if (pData->user == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter user must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}

	if (pData->password == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter password must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}

	if (pData->exchange == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter exchange must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}

	if (pData->routing_key == NULL) {
		errmsg.LogError(0, RS_RET_INVALID_PARAMS, "omrabbitmq module disabled: parameter routing_key must be specified");
		ABORT_FINALIZE(RS_RET_INVALID_PARAMS);
	}
 
	// RabbitMQ properties initialization
	memset(&pData->props, 0, sizeof pData->props);
	pData->props._flags = AMQP_BASIC_DELIVERY_MODE_FLAG;
	pData->props.delivery_mode = 2; /* persistent delivery mode */
	pData->props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
	pData->props.content_type = amqp_cstring_bytes("application/json");

	CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
					    " StdJSONFmt" : (char*)pData->tplName),
		OMSR_NO_RQD_TPL_OPTS));

CODE_STD_FINALIZERnewActInst
	cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst


BEGINparseSelectorAct
CODESTARTparseSelectorAct
	CODE_STD_STRING_REQUESTparseSelectorAct(1)
	if(!strncmp((char*) p, ":omrabbitmq:", sizeof(":omrabbitmq:") - 1)) {
		errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
			"omrabbitmq supports only v6 config format, use: "
			"action(type=\"omrabbitmq\" host=...)");
	}
	ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct


BEGINmodExit
CODESTARTmodExit
	objRelease(errmsg, CORE_COMPONENT);
ENDmodExit


BEGINqueryEtryPt
CODESTARTqueryEtryPt
	CODEqueryEtryPt_STD_OMOD_QUERIES
	CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
ENDqueryEtryPt


BEGINmodInit()
CODESTARTmodInit
	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
	CHKiRet(objUse(errmsg, CORE_COMPONENT));
ENDmodInit