1 /* omelasticsearch.c
2  * This is the http://www.elasticsearch.org/ output module.
3  *
4  * NOTE: read comments in module-template.h for more specifics!
5  *
6  * Copyright 2011 Nathan Scott.
7  * Copyright 2009-2021 Rainer Gerhards and Adiscon GmbH.
8  *
9  * This file is part of rsyslog.
10  *
11  * Licensed under the Apache License, Version 2.0 (the "License");
12  * you may not use this file except in compliance with the License.
13  * You may obtain a copy of the License at
14  *
15  *       http://www.apache.org/licenses/LICENSE-2.0
16  *       -or-
17  *       see COPYING.ASL20 in the source distribution
18  *
19  * Unless required by applicable law or agreed to in writing, software
20  * distributed under the License is distributed on an "AS IS" BASIS,
21  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22  * See the License for the specific language governing permissions and
23  * limitations under the License.
24  */
25 #include "config.h"
26 #include "rsyslog.h"
27 #include <stdio.h>
28 #include <stdarg.h>
29 #include <stdlib.h>
30 #include <memory.h>
31 #include <string.h>
32 #include <curl/curl.h>
33 #include <curl/easy.h>
34 #include <assert.h>
35 #include <signal.h>
36 #include <errno.h>
37 #include <time.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #if defined(__FreeBSD__)
42 #include <unistd.h>
43 #endif
44 #include <json.h>
45 #include "conf.h"
46 #include "syslogd-types.h"
47 #include "srUtils.h"
48 #include "template.h"
49 #include "module-template.h"
50 #include "errmsg.h"
51 #include "statsobj.h"
52 #include "cfsysline.h"
53 #include "unicode-helper.h"
54 #include "obj-types.h"
55 #include "ratelimit.h"
56 #include "ruleset.h"
57 
58 #ifndef O_LARGEFILE
59 #  define O_LARGEFILE 0
60 #endif
61 
62 MODULE_TYPE_OUTPUT
63 MODULE_TYPE_NOKEEP
64 MODULE_CNFNAME("omelasticsearch")
65 
66 /* internal structures */
67 DEF_OMOD_STATIC_DATA
68 DEFobjCurrIf(statsobj)
69 DEFobjCurrIf(prop)
70 DEFobjCurrIf(ruleset)
71 
72 statsobj_t *indexStats;
73 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
74 STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
75 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
76 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
77 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
78 STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
79 STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
80 STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
81 STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
82 STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
83 STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
84 STATSCOUNTER_DEF(rebinds, mutRebinds)
85 
86 static prop_t *pInputName = NULL;
87 
88 #	define META_STRT "{\"index\":{\"_index\": \""
89 #	define META_STRT_CREATE "{\"create\":{\"_index\": \""
90 #	define META_TYPE "\",\"_type\":\""
91 #       define META_PIPELINE "\",\"pipeline\":\""
92 #	define META_PARENT "\",\"_parent\":\""
93 #	define META_ID "\", \"_id\":\""
94 #	define META_END  "\"}}\n"
95 
96 typedef enum {
97 	ES_WRITE_INDEX,
98 	ES_WRITE_CREATE,
99 	ES_WRITE_UPDATE, /* not supported */
100 	ES_WRITE_UPSERT /* not supported */
101 } es_write_ops_t;
102 
103 #define WRKR_DATA_TYPE_ES 0xBADF0001
104 
105 #define DEFAULT_REBIND_INTERVAL -1
106 
107 /* REST API for elasticsearch hits this URL:
108  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
109  */
110 /* bulk API uses /_bulk */
111 typedef struct curl_slist HEADER;
112 typedef struct instanceConf_s {
113 	int defaultPort;
114 	int fdErrFile;		/* error file fd or -1 if not open */
115 	pthread_mutex_t mutErrFile;
116 	uchar **serverBaseUrls;
117 	int numServers;
118 	long healthCheckTimeout;
119 	uchar *uid;
120 	uchar *pwd;
121 	uchar *authBuf;
122 	uchar *searchIndex;
123 	uchar *searchType;
124 	uchar *pipelineName;
125 	sbool skipPipelineIfEmpty;
126 	uchar *parent;
127 	uchar *tplName;
128 	uchar *timeout;
129 	uchar *bulkId;
130 	uchar *errorFile;
131 	sbool errorOnly;
132 	sbool interleaved;
133 	sbool dynSrchIdx;
134 	sbool dynSrchType;
135 	sbool dynParent;
136 	sbool dynBulkId;
137 	sbool dynPipelineName;
138 	sbool bulkmode;
139 	size_t maxbytes;
140 	sbool useHttps;
141 	sbool allowUnsignedCerts;
142 	sbool skipVerifyHost;
143 	uchar *caCertFile;
144 	uchar *myCertFile;
145 	uchar *myPrivKeyFile;
146 	es_write_ops_t writeOperation;
147 	sbool retryFailures;
148 	unsigned int ratelimitInterval;
149 	unsigned int ratelimitBurst;
150 	/* for retries */
151 	ratelimit_t *ratelimiter;
152 	uchar *retryRulesetName;
153 	ruleset_t *retryRuleset;
154 	int rebindInterval;
155 	struct instanceConf_s *next;
156 } instanceData;
157 
158 struct modConfData_s {
159 	rsconf_t *pConf;		/* our overall config object */
160 	instanceConf_t *root, *tail;
161 };
162 static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
163 
164 typedef struct wrkrInstanceData {
165 	PTR_ASSERT_DEF
166 	instanceData *pData;
167 	int serverIndex;
168 	int replyLen;
169 	size_t replyBufLen;
170 	char *reply;
171 	CURL	*curlCheckConnHandle;	/* libcurl session handle for checking the server connection */
172 	CURL	*curlPostHandle;	/* libcurl session handle for posting data to the server */
173 	HEADER	*curlHeader;	/* json POST request info */
174 	uchar *restURL;		/* last used URL for error reporting */
175 	struct {
176 		es_str_t *data;
177 		int nmemb;	/* number of messages in batch (for statistics counting) */
178 		uchar *currTpl1;
179 		uchar *currTpl2;
180 	} batch;
181 	int nOperations; /* counter used with rebindInterval */
182 } wrkrInstanceData_t;
183 
184 /* tables for interfacing with the v6 config system */
185 /* action (instance) parameters */
186 static struct cnfparamdescr actpdescr[] = {
187 	{ "server", eCmdHdlrArray, 0 },
188 	{ "serverport", eCmdHdlrInt, 0 },
189 	{ "healthchecktimeout", eCmdHdlrInt, 0 },
190 	{ "uid", eCmdHdlrGetWord, 0 },
191 	{ "pwd", eCmdHdlrGetWord, 0 },
192 	{ "searchindex", eCmdHdlrGetWord, 0 },
193 	{ "searchtype", eCmdHdlrGetWord, 0 },
194 	{ "pipelinename", eCmdHdlrGetWord, 0 },
195 	{ "skippipelineifempty", eCmdHdlrBinary, 0 },
196 	{ "parent", eCmdHdlrGetWord, 0 },
197 	{ "dynsearchindex", eCmdHdlrBinary, 0 },
198 	{ "dynsearchtype", eCmdHdlrBinary, 0 },
199 	{ "dynparent", eCmdHdlrBinary, 0 },
200 	{ "bulkmode", eCmdHdlrBinary, 0 },
201 	{ "maxbytes", eCmdHdlrSize, 0 },
202 	{ "asyncrepl", eCmdHdlrGoneAway, 0 },
203 	{ "usehttps", eCmdHdlrBinary, 0 },
204 	{ "timeout", eCmdHdlrGetWord, 0 },
205 	{ "errorfile", eCmdHdlrGetWord, 0 },
206 	{ "erroronly", eCmdHdlrBinary, 0 },
207 	{ "interleaved", eCmdHdlrBinary, 0 },
208 	{ "template", eCmdHdlrGetWord, 0 },
209 	{ "dynbulkid", eCmdHdlrBinary, 0 },
210 	{ "dynpipelinename", eCmdHdlrBinary, 0 },
211 	{ "bulkid", eCmdHdlrGetWord, 0 },
212 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
213 	{ "skipverifyhost", eCmdHdlrBinary, 0 },
214 	{ "tls.cacert", eCmdHdlrString, 0 },
215 	{ "tls.mycert", eCmdHdlrString, 0 },
216 	{ "tls.myprivkey", eCmdHdlrString, 0 },
217 	{ "writeoperation", eCmdHdlrGetWord, 0 },
218 	{ "retryfailures", eCmdHdlrBinary, 0 },
219 	{ "ratelimit.interval", eCmdHdlrInt, 0 },
220 	{ "ratelimit.burst", eCmdHdlrInt, 0 },
221 	{ "retryruleset", eCmdHdlrString, 0 },
222 	{ "rebindinterval", eCmdHdlrInt, 0 }
223 };
224 static struct cnfparamblk actpblk =
225 	{ CNFPARAMBLK_VERSION,
226 	  sizeof(actpdescr)/sizeof(struct cnfparamdescr),
227 	  actpdescr
228 	};
229 
230 static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData);
231 
232 BEGINcreateInstance
233 CODESTARTcreateInstance
234 	pData->fdErrFile = -1;
235 	if(pthread_mutex_init(&pData->mutErrFile, NULL) != 0) {
236 		LogError(errno, RS_RET_ERR, "omelasticsearch: cannot create "
237 			"error file mutex, failing this action");
238 		ABORT_FINALIZE(RS_RET_ERR);
239 	}
240 	pData->caCertFile = NULL;
241 	pData->myCertFile = NULL;
242 	pData->myPrivKeyFile = NULL;
243 	pData->ratelimiter = NULL;
244 	pData->retryRulesetName = NULL;
245 	pData->retryRuleset = NULL;
246 	pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
247 finalize_it:
248 ENDcreateInstance
249 
250 BEGINcreateWrkrInstance
251 CODESTARTcreateWrkrInstance
252 	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
253 	pWrkrData->curlHeader = NULL;
254 	pWrkrData->curlPostHandle = NULL;
255 	pWrkrData->curlCheckConnHandle = NULL;
256 	pWrkrData->serverIndex = 0;
257 	pWrkrData->restURL = NULL;
258 	if(pData->bulkmode) {
259 		pWrkrData->batch.currTpl1 = NULL;
260 		pWrkrData->batch.currTpl2 = NULL;
261 		if((pWrkrData->batch.data = es_newStr(1024)) == NULL) {
262 			LogError(0, RS_RET_OUT_OF_MEMORY,
263 				"omelasticsearch: error creating batch string "
264 			        "turned off bulk mode\n");
265 			pData->bulkmode = 0; /* at least it works */
266 		}
267 	}
268 	pWrkrData->nOperations = 0;
269 	pWrkrData->reply = NULL;
270 	pWrkrData->replyLen = 0;
271 	pWrkrData->replyBufLen = 0;
272 	iRet = curlSetup(pWrkrData);
273 ENDcreateWrkrInstance
274 
275 BEGINisCompatibleWithFeature
276 CODESTARTisCompatibleWithFeature
277 	if(eFeat == sFEATURERepeatedMsgReduction)
278 		iRet = RS_RET_OK;
279 ENDisCompatibleWithFeature
280 
281 BEGINfreeInstance
282 	int i;
283 CODESTARTfreeInstance
284 	if(pData->fdErrFile != -1)
285 		close(pData->fdErrFile);
286 
287 	if(loadModConf != NULL) {
288 		/* we keep our instances in our own internal list - this also
289 		 * means we need to cleanup that list, else we cause grief.
290 		 */
291 		instanceData *prev = NULL;
292 		for(instanceData *inst = loadModConf->root ; inst != NULL ; inst = inst->next) {
293 			if(inst == pData) {
294 				if(loadModConf->tail == inst) {
295 					loadModConf->tail = prev;
296 				}
297 				prev->next = inst->next;
298 				/* no need to correct inst back to prev - we exit now! */
299 				break;
300 			} else {
301 				prev = inst;
302 			}
303 		}
304 	}
305 
306 	pthread_mutex_destroy(&pData->mutErrFile);
307 	for(i = 0 ; i < pData->numServers ; ++i)
308 		free(pData->serverBaseUrls[i]);
309 	free(pData->serverBaseUrls);
310 	free(pData->uid);
311 	free(pData->pwd);
312 	free(pData->authBuf);
313 	free(pData->searchIndex);
314 	free(pData->searchType);
315 	free(pData->pipelineName);
316 	free(pData->parent);
317 	free(pData->tplName);
318 	free(pData->timeout);
319 	free(pData->errorFile);
320 	free(pData->bulkId);
321 	free(pData->caCertFile);
322 	free(pData->myCertFile);
323 	free(pData->myPrivKeyFile);
324 	free(pData->retryRulesetName);
325 	if (pData->ratelimiter != NULL)
326 		ratelimitDestruct(pData->ratelimiter);
327 ENDfreeInstance
328 
329 BEGINfreeWrkrInstance
330 CODESTARTfreeWrkrInstance
331 	if(pWrkrData->curlHeader != NULL) {
332 		curl_slist_free_all(pWrkrData->curlHeader);
333 		pWrkrData->curlHeader = NULL;
334 	}
335 	if(pWrkrData->curlCheckConnHandle != NULL) {
336 		curl_easy_cleanup(pWrkrData->curlCheckConnHandle);
337 		pWrkrData->curlCheckConnHandle = NULL;
338 	}
339 	if(pWrkrData->curlPostHandle != NULL) {
340 		curl_easy_cleanup(pWrkrData->curlPostHandle);
341 		pWrkrData->curlPostHandle = NULL;
342 	}
343 	if (pWrkrData->restURL != NULL) {
344 		free(pWrkrData->restURL);
345 		pWrkrData->restURL = NULL;
346 	}
347 	es_deleteStr(pWrkrData->batch.data);
348 	free(pWrkrData->reply);
349 ENDfreeWrkrInstance
350 
351 BEGINdbgPrintInstInfo
352 	int i;
353 CODESTARTdbgPrintInstInfo
354 	dbgprintf("omelasticsearch\n");
355 	dbgprintf("\ttemplate='%s'\n", pData->tplName);
356 	dbgprintf("\tnumServers=%d\n", pData->numServers);
357 	dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout);
358 	dbgprintf("\tserverBaseUrls=");
359 	for(i = 0 ; i < pData->numServers ; ++i)
360 		dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]);
361 	dbgprintf("]\n");
362 	dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
363 	dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
364 	dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
365 	dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
366 	dbgprintf("\tsearch type='%s'\n", pData->searchType);
367 	dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
368 	dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
369 	dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty);
370 	dbgprintf("\tparent='%s'\n", pData->parent);
371 	dbgprintf("\ttimeout='%s'\n", pData->timeout);
372 	dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx);
373 	dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType);
374 	dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
375 	dbgprintf("\tuse https=%d\n", pData->useHttps);
376 	dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
377 	dbgprintf("\tmaxbytes=%zu\n", pData->maxbytes);
378 	dbgprintf("\tallowUnsignedCerts=%d\n", pData->allowUnsignedCerts);
379 	dbgprintf("\tskipVerifyHost=%d\n", pData->skipVerifyHost);
380 	dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
381 		(uchar*)"(not configured)" : pData->errorFile);
382 	dbgprintf("\terroronly=%d\n", pData->errorOnly);
383 	dbgprintf("\tinterleaved=%d\n", pData->interleaved);
384 	dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId);
385 	dbgprintf("\tbulkid='%s'\n", pData->bulkId);
386 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
387 	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
388 	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
389 	dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
390 	dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
391 	dbgprintf("\tratelimit.interval='%u'\n", pData->ratelimitInterval);
392 	dbgprintf("\tratelimit.burst='%u'\n", pData->ratelimitBurst);
393 	dbgprintf("\trebindinterval='%d'\n", pData->rebindInterval);
394 ENDdbgPrintInstInfo
395 
396 
397 /* elasticsearch POST result string ... useful for debugging */
398 static size_t
curlResult(void * const ptr,const size_t size,const size_t nmemb,void * const userdata)399 curlResult(void *const ptr, const size_t size, const size_t nmemb, void *const userdata)
400 {
401 	const char *const p = (const char *)ptr;
402 	wrkrInstanceData_t *const pWrkrData = (wrkrInstanceData_t*) userdata;
403 	char *buf;
404 	const size_t size_add = size*nmemb;
405 	size_t newlen;
406 	PTR_ASSERT_CHK(pWrkrData, WRKR_DATA_TYPE_ES);
407 	newlen = pWrkrData->replyLen + size_add;
408 	if(newlen + 1 > pWrkrData->replyBufLen) {
409 		if((buf = realloc(pWrkrData->reply, pWrkrData->replyBufLen + size_add + 1)) == NULL) {
410 			LogError(errno, RS_RET_ERR, "omelasticsearch: realloc failed in curlResult");
411 			return 0; /* abort due to failure */
412 		}
413 		pWrkrData->replyBufLen += size_add + 1;
414 		pWrkrData->reply = buf;
415 	}
416 	memcpy(pWrkrData->reply+pWrkrData->replyLen, p, size_add);
417 	pWrkrData->replyLen = newlen;
418 	return size_add;
419 }
420 
421 /* Build basic URL part, which includes hostname and port as follows:
422  * http://hostname:port/ based on a server param
423  * Newly creates a cstr for this purpose.
424  * Note: serverParam MUST NOT end in '/' (caller must strip if it exists)
425  */
426 static rsRetVal
computeBaseUrl(const char * const serverParam,const int defaultPort,const sbool useHttps,uchar ** baseUrl)427 computeBaseUrl(const char*const serverParam,
428 	const int defaultPort,
429 	const sbool useHttps,
430 	uchar **baseUrl)
431 {
432 #	define SCHEME_HTTPS "https://"
433 #	define SCHEME_HTTP "http://"
434 
435 	char portBuf[64];
436 	int r = 0;
437 	const char *host = serverParam;
438 	DEFiRet;
439 
440 	assert(serverParam[strlen(serverParam)-1] != '/');
441 
442 	es_str_t *urlBuf = es_newStr(256);
443 	if (urlBuf == NULL) {
444 		LogError(0, RS_RET_OUT_OF_MEMORY,
445 		"omelasticsearch: failed to allocate es_str urlBuf in computeBaseUrl");
446 		ABORT_FINALIZE(RS_RET_ERR);
447 	}
448 
449 	/* Find where the hostname/ip of the server starts. If the scheme is not specified
450 	  in the uri, start the buffer with a scheme corresponding to the useHttps parameter.
451 	*/
452 	if (strcasestr(serverParam, SCHEME_HTTP))
453 		host = serverParam + strlen(SCHEME_HTTP);
454 	else if (strcasestr(serverParam, SCHEME_HTTPS))
455 		host = serverParam + strlen(SCHEME_HTTPS);
456 	else
457 		r = useHttps ? es_addBuf(&urlBuf, SCHEME_HTTPS, sizeof(SCHEME_HTTPS)-1) :
458 			es_addBuf(&urlBuf, SCHEME_HTTP, sizeof(SCHEME_HTTP)-1);
459 
460 	if (r == 0) r = es_addBuf(&urlBuf, (char *)serverParam, strlen(serverParam));
461 	if (r == 0 && !strchr(host, ':')) {
462 		snprintf(portBuf, sizeof(portBuf), ":%d", defaultPort);
463 		r = es_addBuf(&urlBuf, portBuf, strlen(portBuf));
464 	}
465 	if (r == 0) r = es_addChar(&urlBuf, '/');
466 	if (r == 0) *baseUrl = (uchar*) es_str2cstr(urlBuf, NULL);
467 
468 	if (r != 0 || baseUrl == NULL) {
469 		LogError(0, RS_RET_ERR,
470 			"omelasticsearch: error occurred computing baseUrl from server %s", serverParam);
471 		ABORT_FINALIZE(RS_RET_ERR);
472 	}
473 finalize_it:
474 	if (urlBuf) {
475 		es_deleteStr(urlBuf);
476 	}
477 	RETiRet;
478 }
479 
480 static inline void
incrementServerIndex(wrkrInstanceData_t * pWrkrData)481 incrementServerIndex(wrkrInstanceData_t *pWrkrData)
482 {
483 	pWrkrData->serverIndex = (pWrkrData->serverIndex + 1) % pWrkrData->pData->numServers;
484 }
485 
486 
487 /* checks if connection to ES can be established; also iterates over
488  * potential servers to support high availability (HA) feature. If it
489  * needs to switch server, will record new one in curl handle.
490  */
ATTR_NONNULL()491 static rsRetVal ATTR_NONNULL()
492 checkConn(wrkrInstanceData_t *const pWrkrData)
493 {
494 #	define HEALTH_URI "_cat/health"
495 	CURL *curl;
496 	CURLcode res;
497 	es_str_t *urlBuf;
498 	char* healthUrl = NULL;
499 	char* serverUrl;
500 	int i;
501 	int r;
502 	DEFiRet;
503 
504 	pWrkrData->replyLen = 0;
505 	curl = pWrkrData->curlCheckConnHandle;
506 	urlBuf = es_newStr(256);
507 	if (urlBuf == NULL) {
508 		LogError(0, RS_RET_OUT_OF_MEMORY,
509 			"omelasticsearch: unable to allocate buffer for health check uri.");
510 		ABORT_FINALIZE(RS_RET_SUSPENDED);
511 	}
512 
513 	for(i = 0; i < pWrkrData->pData->numServers; ++i) {
514 		serverUrl = (char*) pWrkrData->pData->serverBaseUrls[pWrkrData->serverIndex];
515 
516 		es_emptyStr(urlBuf);
517 		r = es_addBuf(&urlBuf, serverUrl, strlen(serverUrl));
518 		if(r == 0) r = es_addBuf(&urlBuf, HEALTH_URI, sizeof(HEALTH_URI)-1);
519 		if(r == 0) healthUrl = es_str2cstr(urlBuf, NULL);
520 		if(r != 0 || healthUrl == NULL) {
521 			LogError(0, RS_RET_OUT_OF_MEMORY,
522 				"omelasticsearch: unable to allocate buffer for health check uri.");
523 			ABORT_FINALIZE(RS_RET_SUSPENDED);
524 		}
525 
526 		curl_easy_setopt(curl, CURLOPT_URL, healthUrl);
527 		curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlResult);
528 		res = curl_easy_perform(curl);
529 		free(healthUrl);
530 
531 		if (res == CURLE_OK) {
532 			DBGPRINTF("omelasticsearch: checkConn %s completed with success "
533 				"on attempt %d\n", serverUrl, i);
534 			ABORT_FINALIZE(RS_RET_OK);
535 		}
536 
537 		DBGPRINTF("omelasticsearch: checkConn %s failed on attempt %d: %s\n",
538 			serverUrl, i, curl_easy_strerror(res));
539 		STATSCOUNTER_INC(checkConnFail, mutCheckConnFail);
540 		incrementServerIndex(pWrkrData);
541 	}
542 
543 	LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
544 		"omelasticsearch: checkConn failed after %d attempts.", i);
545 	ABORT_FINALIZE(RS_RET_SUSPENDED);
546 
547 finalize_it:
548 	if(urlBuf != NULL)
549 		es_deleteStr(urlBuf);
550 	RETiRet;
551 }
552 
553 
554 BEGINtryResume
555 CODESTARTtryResume
556 	DBGPRINTF("omelasticsearch: tryResume called\n");
557 	iRet = checkConn(pWrkrData);
558 ENDtryResume
559 
560 
561 /* get the current index and type for this message */
562 static void ATTR_NONNULL(1)
getIndexTypeAndParent(const instanceData * const pData,uchar ** const tpls,uchar ** const srchIndex,uchar ** const srchType,uchar ** const parent,uchar ** const bulkId,uchar ** const pipelineName)563 getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls,
564 		      uchar **const srchIndex, uchar **const srchType, uchar **const parent,
565 		      uchar **const bulkId, uchar **const pipelineName)
566 {
567 	*srchIndex = pData->searchIndex;
568 	*parent = pData->parent;
569 	*srchType = pData->searchType;
570 	*bulkId = pData->bulkId;
571 	*pipelineName = pData->pipelineName;
572 	if(tpls == NULL) {
573 		goto done;
574 	}
575 
576 	int iNumTpls = 1;
577 	if(pData->dynSrchIdx) {
578 		*srchIndex = tpls[iNumTpls];
579 		++iNumTpls;
580 	}
581 	if(pData->dynSrchType) {
582 		*srchType = tpls[iNumTpls];
583 		++iNumTpls;
584 	}
585 	if(pData->dynParent) {
586 		*parent = tpls[iNumTpls];
587 		++iNumTpls;
588 	}
589 	if(pData->dynBulkId) {
590 		*bulkId = tpls[iNumTpls];
591 		++iNumTpls;
592 	}
593 	if(pData->dynPipelineName) {
594 		*pipelineName = tpls[iNumTpls];
595 		++iNumTpls;
596 	}
597 
598 done:
599 	assert(srchIndex != NULL);
600 	assert(srchType != NULL);
601 	return;
602 }
603 
604 
605 static rsRetVal ATTR_NONNULL(1)
setPostURL(wrkrInstanceData_t * const pWrkrData,uchar ** const tpls)606 setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
607 {
608 	uchar *searchIndex = NULL;
609 	uchar *searchType;
610 	uchar *pipelineName;
611 	uchar *parent;
612 	uchar *bulkId;
613 	char* baseUrl;
614 	es_str_t *url;
615 	int r;
616 	DEFiRet;
617 	instanceData *const pData = pWrkrData->pData;
618 	char separator;
619 	const int bulkmode = pData->bulkmode;
620 
621 	baseUrl = (char*)pData->serverBaseUrls[pWrkrData->serverIndex];
622 	url = es_newStrFromCStr(baseUrl, strlen(baseUrl));
623 	if (url == NULL) {
624 		LogError(0, RS_RET_OUT_OF_MEMORY,
625 			"omelasticsearch: error allocating new estr for POST url.");
626 		ABORT_FINALIZE(RS_RET_ERR);
627 	}
628 
629 	separator = '?';
630 
631 	if(bulkmode) {
632 		r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
633 		parent = NULL;
634 	} else {
635 		getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
636 		r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
637 		if(r == 0) r = es_addChar(&url, '/');
638 		if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
639 		if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
640 			if(r == 0) r = es_addChar(&url, separator);
641 			if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
642 			if(r == 0) r = es_addBuf(&url, (char*)pipelineName, ustrlen(pipelineName));
643 			separator = '&';
644 		}
645 	}
646 
647 	if(pData->timeout != NULL) {
648 		if(r == 0) r = es_addChar(&url, separator);
649 		if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1);
650 		if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout));
651 		separator = '&';
652 	}
653 
654 	if(parent != NULL) {
655 		if(r == 0) r = es_addChar(&url, separator);
656 		if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
657 		if(r == 0) es_addBuf(&url, (char*)parent, ustrlen(parent));
658 	}
659 
660 	if(pWrkrData->restURL != NULL)
661 		free(pWrkrData->restURL);
662 
663 	pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL);
664 	curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_URL, pWrkrData->restURL);
665 	DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL);
666 
667 finalize_it:
668 	if (url != NULL)
669 		es_deleteStr(url);
670 	RETiRet;
671 }
672 
673 
674 /* this method computes the expected size of adding the next message into
675  * the batched request to elasticsearch
676  */
677 static size_t
computeMessageSize(const wrkrInstanceData_t * const pWrkrData,const uchar * const message,uchar ** const tpls)678 computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
679 	const uchar *const message,
680 	uchar **const tpls)
681 {
682 	size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
683 	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
684 		r += sizeof(META_STRT_CREATE)-1;
685 	else
686 		r += sizeof(META_STRT)-1;
687 
688 	uchar *searchIndex = NULL;
689 	uchar *searchType;
690 	uchar *parent = NULL;
691 	uchar *bulkId = NULL;
692 	uchar *pipelineName;
693 
694 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
695 	r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType);
696 
697 	if(parent != NULL) {
698 		r += sizeof(META_PARENT)-1 + ustrlen(parent);
699 	}
700 	if(bulkId != NULL) {
701 		r += sizeof(META_ID)-1 + ustrlen(bulkId);
702 	}
703 	if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
704 		r += sizeof(META_PIPELINE)-1 + ustrlen(pipelineName);
705 	}
706 
707 	return r;
708 }
709 
710 
711 /* this method does not directly submit but builds a batch instead. It
712  * may submit, if we have dynamic index/type and the current type or
713  * index changes.
714  */
715 static rsRetVal
buildBatch(wrkrInstanceData_t * pWrkrData,uchar * message,uchar ** tpls)716 buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
717 {
718 	int length = strlen((char *)message);
719 	int r;
720 	uchar *searchIndex = NULL;
721 	uchar *searchType;
722 	uchar *parent = NULL;
723 	uchar *bulkId = NULL;
724 	uchar *pipelineName;
725 	DEFiRet;
726 
727 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
728 	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
729 		r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
730 	else
731 		r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
732 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
733 				 ustrlen(searchIndex));
734 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
735 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
736 				 ustrlen(searchType));
737 	if(parent != NULL) {
738 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
739 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
740 	}
741 	if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
742 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1);
743 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName));
744 	}
745 	if(bulkId != NULL) {
746 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
747 		if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
748 	}
749 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
750 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
751 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
752 	if(r != 0) {
753 		LogError(0, RS_RET_ERR,
754 			"omelasticsearch: growing batch failed with code %d", r);
755 		ABORT_FINALIZE(RS_RET_ERR);
756 	}
757 	++pWrkrData->batch.nmemb;
758 	iRet = RS_RET_OK;
759 
760 finalize_it:
761 	RETiRet;
762 }
763 
764 /*
765  * Dumps entire bulk request and response in error log
766  */
767 static rsRetVal
getDataErrorDefault(wrkrInstanceData_t * pWrkrData,fjson_object ** pReplyRoot,uchar * reqmsg,char ** rendered)768 getDataErrorDefault(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,char **rendered)
769 {
770 	DEFiRet;
771 	fjson_object *req=NULL;
772 	fjson_object *errRoot=NULL;
773 	fjson_object *replyRoot = *pReplyRoot;
774 
775 	if((req=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
776 	fjson_object_object_add(req, "url", fjson_object_new_string((char*)pWrkrData->restURL));
777 	fjson_object_object_add(req, "postdata", fjson_object_new_string((char*)reqmsg));
778 
779 	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
780 	fjson_object_object_add(errRoot, "request", req);
781 	fjson_object_object_add(errRoot, "reply", replyRoot);
782 	*rendered = strdup((char*)fjson_object_to_json_string(errRoot));
783 
784 	req=NULL;
785 	fjson_object_put(errRoot);
786 
787 	*pReplyRoot = NULL; /* tell caller not to delete once again! */
788 
789 	finalize_it:
790 		fjson_object_put(req);
791 		RETiRet;
792 }
793 
794 /*
795  * Sets bulkRequestNextSectionStart pointer to next sections start in the buffer pointed by bulkRequest.
796  * Sections are marked by { and }
797  */
798 static rsRetVal
getSection(const char * bulkRequest,const char ** bulkRequestNextSectionStart)799 getSection(const char* bulkRequest, const char **bulkRequestNextSectionStart )
800 {
801 		DEFiRet;
802 		char* idx =0;
803 		if( (idx = strchr(bulkRequest,'\n')) != 0)/*intermediate section*/
804 		{
805 			*bulkRequestNextSectionStart = ++idx;
806 		}
807 		else
808 		{
809 			*bulkRequestNextSectionStart=0;
810 			ABORT_FINALIZE(RS_RET_ERR);
811 		}
812 
813 	     finalize_it:
814 	     	  RETiRet;
815 }
816 
817 /*
818  * Sets the new string in singleRequest for one request in bulkRequest
819  * and sets lastLocation pointer to the location till which bulkrequest has been parsed.
820  * (used as input to make function thread safe.)
821  */
822 static rsRetVal
getSingleRequest(const char * bulkRequest,char ** singleRequest,const char ** lastLocation)823 getSingleRequest(const char* bulkRequest, char** singleRequest, const char **lastLocation)
824 {
825 	DEFiRet;
826 	const char *req = bulkRequest;
827 	const char *start = bulkRequest;
828 	if (getSection(req,&req)!=RS_RET_OK)
829 		ABORT_FINALIZE(RS_RET_ERR);
830 
831 	if (getSection(req,&req)!=RS_RET_OK)
832 			ABORT_FINALIZE(RS_RET_ERR);
833 
834 	CHKmalloc(*singleRequest = (char*) calloc (req - start+ 1 + 1,1));
835 	/* (req - start+ 1 == length of data + 1 for terminal char)*/
836 	memcpy(*singleRequest,start,req - start);
837 	*lastLocation=req;
838 
839 finalize_it:
840 	RETiRet;
841 }
842 
843 /*
844  * check the status of response from ES
845  */
checkReplyStatus(fjson_object * ok)846 static int checkReplyStatus(fjson_object* ok) {
847 	return (ok == NULL || !fjson_object_is_type(ok, fjson_type_int) || fjson_object_get_int(ok) < 0 ||
848 		fjson_object_get_int(ok) > 299);
849 }
850 
851 /*
852  * Context object for error file content creation or status check
853  * response_item - the full {"create":{"_index":"idxname",.....}}
854  * response_body - the inner hash of the response_item - {"_index":"idxname",...}
855  * status - the "status" field from the inner hash - "status":500
856  *          should be able to use fjson_object_get_int(status) to get the http result code
857  */
858 typedef struct exeContext{
859 	int statusCheckOnly;
860 	fjson_object *errRoot;
861 	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
862 			fjson_object *response_item, fjson_object *response_body, fjson_object *status);
863 	es_write_ops_t writeOperation;
864 	ratelimit_t *ratelimiter;
865 	ruleset_t *retryRuleset;
866 	struct json_tokener *jTokener;
867 } context;
868 
869 /*
870  * get content to be written in error file using context passed
871  */
872 static rsRetVal
parseRequestAndResponseForContext(wrkrInstanceData_t * pWrkrData,fjson_object ** pReplyRoot,uchar * reqmsg,context * ctx)873 parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,context *ctx)
874 {
875 	DEFiRet;
876 	fjson_object *replyRoot = *pReplyRoot;
877 	int i;
878 	int numitems;
879 	fjson_object *items=NULL, *jo_errors = NULL;
880 	int errors = 0;
881 
882 	if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
883 		errors = fjson_object_get_boolean(jo_errors);
884 		if (!errors && pWrkrData->pData->retryFailures) {
885 			return RS_RET_OK;
886 		}
887 	}
888 
889 	/*iterate over items*/
890 	if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
891 		LogError(0, RS_RET_DATAFAIL,
892 			"omelasticsearch: error in elasticsearch reply: "
893 			"bulkmode insert does not return array, reply is: %s",
894 			pWrkrData->reply);
895 		ABORT_FINALIZE(RS_RET_DATAFAIL);
896 	}
897 
898 	numitems = fjson_object_array_length(items);
899 
900 	if (reqmsg) {
901 		DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
902 	} else {
903 		DBGPRINTF("omelasticsearch: Empty request\n");
904 	}
905 	const char *lastReqRead= (char*)reqmsg;
906 
907 	DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
908 	for(i = 0 ; i < numitems ; ++i) {
909 
910 		fjson_object *item=NULL;
911 		fjson_object *result=NULL;
912 		fjson_object *ok=NULL;
913 		int itemStatus=0;
914 		item = fjson_object_array_get_idx(items, i);
915 		if(item == NULL)  {
916 			LogError(0, RS_RET_DATAFAIL,
917 				"omelasticsearch: error in elasticsearch reply: "
918 				"cannot obtain reply array item %d", i);
919 			ABORT_FINALIZE(RS_RET_DATAFAIL);
920 		}
921 		fjson_object_object_get_ex(item, "create", &result);
922 		if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) {
923 			fjson_object_object_get_ex(item, "index", &result);
924 			if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) {
925 				LogError(0, RS_RET_DATAFAIL,
926 					"omelasticsearch: error in elasticsearch reply: "
927 					"cannot obtain 'result' item for #%d", i);
928 				ABORT_FINALIZE(RS_RET_DATAFAIL);
929 			}
930 		}
931 
932 		fjson_object_object_get_ex(result, "status", &ok);
933 		itemStatus = checkReplyStatus(ok);
934 
935 		char *request =0;
936 		char *response =0;
937 		if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
938 			if(itemStatus) {
939 				DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, "
940 					"status is %d\n", i, fjson_object_get_int(ok));
941 				DBGPRINTF("omelasticsearch: status check found error.\n");
942 				ABORT_FINALIZE(RS_RET_DATAFAIL);
943 			}
944 
945 		} else {
946 			if(getSingleRequest(lastReqRead,&request,&lastReqRead) != RS_RET_OK) {
947 				DBGPRINTF("omelasticsearch: Couldn't get post request\n");
948 				ABORT_FINALIZE(RS_RET_ERR);
949 			}
950 			response = (char*)fjson_object_to_json_string_ext(result, FJSON_TO_STRING_PLAIN);
951 
952 			if(response==NULL) {
953 				free(request);/*as its has been assigned.*/
954 				DBGPRINTF("omelasticsearch: Error getting fjson_object_to_string_ext. Cannot "
955 					"continue\n");
956 				ABORT_FINALIZE(RS_RET_ERR);
957 			}
958 
959 			/*call the context*/
960 			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
961 					response, item, result, ok);
962 
963 			/*free memory in any case*/
964 			free(request);
965 
966 			if(ret != RS_RET_OK) {
967 				DBGPRINTF("omelasticsearch: Error in preparing errorfileContent. Cannot continue\n");
968 				ABORT_FINALIZE(RS_RET_ERR);
969 			}
970 		}
971 	}
972 
973 finalize_it:
974 	RETiRet;
975 }
976 
977 /*
978  * Dumps only failed requests of bulk insert
979  */
980 static rsRetVal
getDataErrorOnly(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)981 getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
982 		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
983 {
984 	DEFiRet;
985 	(void)response_item; /* unused */
986 	(void)response_body; /* unused */
987 	(void)status; /* unused */
988 	if(itemStatus) {
989 		fjson_object *onlyErrorResponses =NULL;
990 		fjson_object *onlyErrorRequests=NULL;
991 
992 		if(!fjson_object_object_get_ex(ctx->errRoot, "reply", &onlyErrorResponses)) {
993 			DBGPRINTF("omelasticsearch: Failed to get reply json array. Invalid context. Cannot "
994 				"continue\n");
995 			ABORT_FINALIZE(RS_RET_ERR);
996 		}
997 		fjson_object_array_add(onlyErrorResponses, fjson_object_new_string(response));
998 
999 		if(!fjson_object_object_get_ex(ctx->errRoot, "request", &onlyErrorRequests)) {
1000 			DBGPRINTF("omelasticsearch: Failed to get request json array. Invalid context. Cannot "
1001 				"continue\n");
1002 			ABORT_FINALIZE(RS_RET_ERR);
1003 		}
1004 
1005 		fjson_object_array_add(onlyErrorRequests, fjson_object_new_string(request));
1006 
1007 	}
1008 
1009 finalize_it:
1010 	RETiRet;
1011 }
1012 
1013 /*
1014  * Dumps all requests of bulk insert interleaved with request and response
1015  */
1016 
1017 static rsRetVal
getDataInterleaved(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1018 getDataInterleaved(context *ctx,
1019 	int __attribute__((unused)) itemStatus,
1020 	char *request,
1021 	char *response,
1022 	fjson_object *response_item,
1023 	fjson_object *response_body,
1024 	fjson_object *status
1025 )
1026 {
1027 	DEFiRet;
1028 	(void)response_item; /* unused */
1029 	(void)response_body; /* unused */
1030 	(void)status; /* unused */
1031 	fjson_object *interleaved =NULL;
1032 	if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved)) {
1033 		DBGPRINTF("omelasticsearch: Failed to get response json array. Invalid context. Cannot continue\n");
1034 		ABORT_FINALIZE(RS_RET_ERR);
1035 	}
1036 
1037 	fjson_object *interleavedNode=NULL;
1038 	/*create interleaved node that has req and response json data*/
1039 	if((interleavedNode=fjson_object_new_object()) == NULL)
1040 	{
1041 		DBGPRINTF("omelasticsearch: Failed to create interleaved node. Cann't continue\n");
1042 		ABORT_FINALIZE(RS_RET_ERR);
1043 	}
1044 	fjson_object_object_add(interleavedNode,"request", fjson_object_new_string(request));
1045 	fjson_object_object_add(interleavedNode,"reply", fjson_object_new_string(response));
1046 
1047 	fjson_object_array_add(interleaved, interleavedNode);
1048 
1049 
1050 
1051 	finalize_it:
1052 		RETiRet;
1053 }
1054 
1055 
1056 /*
1057  * Dumps only failed requests of bulk insert interleaved with request and response
1058  */
1059 
1060 static rsRetVal
getDataErrorOnlyInterleaved(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1061 getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
1062 		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
1063 {
1064 	DEFiRet;
1065 	if (itemStatus) {
1066 		if(getDataInterleaved(ctx, itemStatus,request,response,
1067 				response_item, response_body, status)!= RS_RET_OK) {
1068 			ABORT_FINALIZE(RS_RET_ERR);
1069 		}
1070 	}
1071 
1072 	finalize_it:
1073 		RETiRet;
1074 }
1075 
1076 /* input JSON looks like this:
1077  * {"someoperation":{"field1":"value1","field2":{.....}}}
1078  * output looks like this:
1079  * {"writeoperation":"someoperation","field1":"value1","field2":{.....}}
1080  */
1081 static rsRetVal
formatBulkReqOrResp(fjson_object * jo_input,fjson_object * jo_output)1082 formatBulkReqOrResp(fjson_object *jo_input, fjson_object *jo_output)
1083 {
1084 	DEFiRet;
1085 	fjson_object *jo = NULL;
1086 	struct json_object_iterator it = json_object_iter_begin(jo_input);
1087 	struct json_object_iterator itEnd = json_object_iter_end(jo_input);
1088 
1089 	/* set the writeoperation if not already set */
1090 	if (!fjson_object_object_get_ex(jo_output, "writeoperation", NULL)) {
1091 		const char *optype = NULL;
1092 		if (!json_object_iter_equal(&it, &itEnd))
1093 			optype = json_object_iter_peek_name(&it);
1094 		if (optype) {
1095 			jo = json_object_new_string(optype);
1096 		} else {
1097 			jo = json_object_new_string("unknown");
1098 		}
1099 		CHKmalloc(jo);
1100 		json_object_object_add(jo_output, "writeoperation", jo);
1101 	}
1102 	if (!json_object_iter_equal(&it, &itEnd)) {
1103 		/* now iterate the operation object */
1104 		jo = json_object_iter_peek_value(&it);
1105 		it = json_object_iter_begin(jo);
1106 		itEnd = json_object_iter_end(jo);
1107 		while (!json_object_iter_equal(&it, &itEnd)) {
1108 			const char *name = json_object_iter_peek_name(&it);
1109 			/* do not overwrite existing fields */
1110 			if (!fjson_object_object_get_ex(jo_output, name, NULL)) {
1111 				json_object_object_add(jo_output, name,
1112 						json_object_get(json_object_iter_peek_value(&it)));
1113 			}
1114 			json_object_iter_next(&it);
1115 		}
1116 	}
1117 finalize_it:
1118 	RETiRet;
1119 }
1120 
1121 /* request string looks like this (other fields are "_parent" and "pipeline")
1122  * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
1123  *   \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
1124  * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
1125  * store the metadata header fields in the metadata object
1126  * start = first \n + 1
1127  * end = last \n
1128  */
1129 static rsRetVal
createMsgFromRequest(const char * request,context * ctx,smsg_t ** msg,fjson_object * omes)1130 createMsgFromRequest(const char *request, context *ctx, smsg_t **msg, fjson_object *omes)
1131 {
1132 	DEFiRet;
1133 	fjson_object *jo_msg = NULL, *jo_metadata = NULL, *jo_request = NULL;
1134 	const char *datastart, *dataend;
1135 	size_t datalen;
1136 	enum json_tokener_error json_error;
1137 
1138 	*msg = NULL;
1139 	if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
1140 		LogError(0, RS_RET_ERR,
1141 			"omelasticsearch: malformed original request - "
1142 			"could not find start of original data [%s]",
1143 			request);
1144 		ABORT_FINALIZE(RS_RET_ERR);
1145 	}
1146 	datalen = datastart - request;
1147 	json_tokener_reset(ctx->jTokener);
1148 	jo_metadata = json_tokener_parse_ex(ctx->jTokener, request, datalen);
1149 	json_error = fjson_tokener_get_error(ctx->jTokener);
1150 	if (!jo_metadata || (json_error != fjson_tokener_success)) {
1151 		LogError(0, RS_RET_ERR,
1152 			"omelasticsearch: parse error [%s] - could not convert original "
1153 			"request metadata header JSON back into JSON object [%s]",
1154 			fjson_tokener_error_desc(json_error), request);
1155 		ABORT_FINALIZE(RS_RET_ERR);
1156 	}
1157 	CHKiRet(formatBulkReqOrResp(jo_metadata, omes));
1158 
1159 	datastart++; /* advance to '{' */
1160 	if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
1161 		LogError(0, RS_RET_ERR,
1162 			"omelasticsearch: malformed original request - "
1163 			"could not find end of original data [%s]",
1164 			request);
1165 		ABORT_FINALIZE(RS_RET_ERR);
1166 	}
1167 	datalen = dataend - datastart;
1168 	json_tokener_reset(ctx->jTokener);
1169 	jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
1170 	json_error = fjson_tokener_get_error(ctx->jTokener);
1171 	if (!jo_request || (json_error != fjson_tokener_success)) {
1172 		LogError(0, RS_RET_ERR,
1173 			"omelasticsearch: parse error [%s] - could not convert original "
1174 			"request JSON back into JSON object [%s]",
1175 			fjson_tokener_error_desc(json_error), request);
1176 		ABORT_FINALIZE(RS_RET_ERR);
1177 	}
1178 
1179 	CHKiRet(msgConstruct(msg));
1180 	MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
1181 	MsgSetInputName(*msg, pInputName);
1182 	if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
1183 		const char *rawmsg = json_object_get_string(jo_msg);
1184 		const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
1185 		MsgSetRawMsg(*msg, rawmsg, msgLen);
1186 	} else {
1187 		/* use entire data part of request as rawmsg */
1188 		MsgSetRawMsg(*msg, datastart, datalen);
1189 	}
1190 	MsgSetMSGoffs(*msg, 0);	/* we do not have a header... */
1191 	MsgSetTAG(*msg, (const uchar *)"omes", 4);
1192 	CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
1193 
1194 finalize_it:
1195 	if (jo_metadata)
1196 		json_object_put(jo_metadata);
1197 	RETiRet;
1198 }
1199 
1200 
1201 static rsRetVal
getDataRetryFailures(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1202 getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
1203 		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
1204 {
1205 	DEFiRet;
1206 	fjson_object *omes = NULL, *jo = NULL;
1207 	int istatus = fjson_object_get_int(status);
1208 	int iscreateop = 0;
1209 	const char *optype = NULL;
1210 	smsg_t *msg = NULL;
1211 	int need_free_omes = 0;
1212 
1213 	(void)response;
1214 	(void)itemStatus;
1215 	(void)response_body;
1216 	CHKmalloc(omes = json_object_new_object());
1217 	need_free_omes = 1;
1218 	/* this adds metadata header fields to omes */
1219 	if (RS_RET_OK != (iRet = createMsgFromRequest(request, ctx, &msg, omes))) {
1220 		if (iRet != RS_RET_OUT_OF_MEMORY) {
1221 			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1222 		} else {
1223 			ABORT_FINALIZE(iRet);
1224 		}
1225 	}
1226 	CHKmalloc(msg);
1227 	/* this adds response fields as local variables to omes */
1228 	if (RS_RET_OK != (iRet = formatBulkReqOrResp(response_item, omes))) {
1229 		if (iRet != RS_RET_OUT_OF_MEMORY) {
1230 			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1231 		} else {
1232 			ABORT_FINALIZE(iRet);
1233 		}
1234 	}
1235 	if (fjson_object_object_get_ex(omes, "writeoperation", &jo)) {
1236 		optype = json_object_get_string(jo);
1237 		if (optype && !strcmp("create", optype))
1238 			iscreateop = 1;
1239 		if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
1240 			iscreateop = 1;
1241 	}
1242 
1243 	if (!optype) {
1244 		STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1245 		LogMsg(0, RS_RET_ERR, LOG_INFO,
1246 			"omelasticsearch: no recognized operation type in response [%s]",
1247 			response);
1248 	} else if ((istatus == 200) || (istatus == 201)) {
1249 		STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
1250 	} else if ((istatus == 409) && iscreateop) {
1251 		STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
1252 	} else if ((istatus == 400) || (istatus < 200)) {
1253 		STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
1254 	} else {
1255 		fjson_object *error = NULL, *errtype = NULL;
1256 		if(fjson_object_object_get_ex(omes, "error", &error) &&
1257 		   fjson_object_object_get_ex(error, "type", &errtype)) {
1258 			if (istatus == 429) {
1259 				STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
1260 			} else {
1261 				STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
1262 			}
1263 		} else {
1264 			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1265 			LogMsg(0, RS_RET_ERR, LOG_INFO,
1266 				"omelasticsearch: unexpected error response [%s]",
1267 				response);
1268 		}
1269 	}
1270 	need_free_omes = 0;
1271 	CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
1272 	MsgSetRuleset(msg, ctx->retryRuleset);
1273 	CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
1274 finalize_it:
1275 	if (need_free_omes)
1276 		json_object_put(omes);
1277 	RETiRet;
1278 }
1279 
1280 /*
1281  * get erroronly context
1282  */
1283 static rsRetVal
initializeErrorOnlyConext(wrkrInstanceData_t * pWrkrData,context * ctx)1284 initializeErrorOnlyConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1285 	DEFiRet;
1286 	ctx->statusCheckOnly=0;
1287 	fjson_object *errRoot=NULL;
1288 	fjson_object *onlyErrorResponses =NULL;
1289 	fjson_object *onlyErrorRequests=NULL;
1290 	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1291 
1292 	if((onlyErrorResponses=fjson_object_new_array()) == NULL) {
1293 		fjson_object_put(errRoot);
1294 		ABORT_FINALIZE(RS_RET_ERR);
1295 	}
1296 	if((onlyErrorRequests=fjson_object_new_array()) == NULL) {
1297 		fjson_object_put(errRoot);
1298 		fjson_object_put(onlyErrorResponses);
1299 		ABORT_FINALIZE(RS_RET_ERR);
1300 	}
1301 
1302 	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1303 	fjson_object_object_add(errRoot,"request",onlyErrorRequests);
1304 	fjson_object_object_add(errRoot, "reply", onlyErrorResponses);
1305 	ctx->errRoot = errRoot;
1306 	ctx->prepareErrorFileContent= &getDataErrorOnly;
1307 	finalize_it:
1308 		RETiRet;
1309 }
1310 
1311 /*
1312  * get interleaved context
1313  */
1314 static rsRetVal
initializeInterleavedConext(wrkrInstanceData_t * pWrkrData,context * ctx)1315 initializeInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1316 	DEFiRet;
1317 	ctx->statusCheckOnly=0;
1318 	fjson_object *errRoot=NULL;
1319 	fjson_object *interleaved =NULL;
1320 	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1321 	if((interleaved=fjson_object_new_array()) == NULL) {
1322 		fjson_object_put(errRoot);
1323 		ABORT_FINALIZE(RS_RET_ERR);
1324 	}
1325 
1326 
1327 	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1328 	fjson_object_object_add(errRoot,"response",interleaved);
1329 	ctx->errRoot = errRoot;
1330 	ctx->prepareErrorFileContent= &getDataInterleaved;
1331 	finalize_it:
1332 		RETiRet;
1333 }
1334 
1335 /*get interleaved context*/
1336 static rsRetVal
initializeErrorInterleavedConext(wrkrInstanceData_t * pWrkrData,context * ctx)1337 initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1338 	DEFiRet;
1339 	ctx->statusCheckOnly=0;
1340 	fjson_object *errRoot=NULL;
1341 	fjson_object *interleaved =NULL;
1342 	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1343 	if((interleaved=fjson_object_new_array()) == NULL) {
1344 		fjson_object_put(errRoot);
1345 		ABORT_FINALIZE(RS_RET_ERR);
1346 	}
1347 
1348 
1349 	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1350 	fjson_object_object_add(errRoot,"response",interleaved);
1351 	ctx->errRoot = errRoot;
1352 	ctx->prepareErrorFileContent= &getDataErrorOnlyInterleaved;
1353 	finalize_it:
1354 		RETiRet;
1355 }
1356 
1357 /*get retry failures context*/
1358 static rsRetVal
initializeRetryFailuresContext(wrkrInstanceData_t * pWrkrData,context * ctx)1359 initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
1360 	DEFiRet;
1361 	ctx->statusCheckOnly=0;
1362 	fjson_object *errRoot=NULL;
1363 	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1364 
1365 
1366 	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1367 	ctx->errRoot = errRoot;
1368 	ctx->prepareErrorFileContent= &getDataRetryFailures;
1369 	CHKmalloc(ctx->jTokener = json_tokener_new());
1370 	finalize_it:
1371 		RETiRet;
1372 }
1373 
1374 
1375 /* write data error request/replies to separate error file
1376  * Note: we open the file but never close it before exit. If it
1377  * needs to be closed, HUP must be sent.
1378  */
ATTR_NONNULL()1379 static rsRetVal ATTR_NONNULL()
1380 writeDataError(wrkrInstanceData_t *const pWrkrData,
1381 	instanceData *const pData, fjson_object **const pReplyRoot,
1382 	uchar *const reqmsg)
1383 {
1384 	char *rendered = NULL;
1385 	size_t toWrite;
1386 	ssize_t wrRet;
1387 	sbool bMutLocked = 0;
1388 	context ctx;
1389 	ctx.errRoot=0;
1390 	ctx.writeOperation = pWrkrData->pData->writeOperation;
1391 	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
1392 	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
1393 	ctx.jTokener = NULL;
1394 	DEFiRet;
1395 
1396 	if(pData->errorFile == NULL) {
1397 		DBGPRINTF("omelasticsearch: no local error logger defined - "
1398 		          "ignoring ES error information\n");
1399 		FINALIZE;
1400 	}
1401 
1402 	pthread_mutex_lock(&pData->mutErrFile);
1403 	bMutLocked = 1;
1404 
1405 	DBGPRINTF("omelasticsearch: error file mode: erroronly='%d' errorInterleaved='%d'\n",
1406 		pData->errorOnly, pData->interleaved);
1407 
1408 	if(pData->interleaved ==0 && pData->errorOnly ==0)/*default write*/
1409 	{
1410 		if(getDataErrorDefault(pWrkrData,pReplyRoot, reqmsg, &rendered) != RS_RET_OK) {
1411 			ABORT_FINALIZE(RS_RET_ERR);
1412 		}
1413 	} else {
1414 		/*get correct context.*/
1415 		if(pData->interleaved && pData->errorOnly)
1416 		{
1417 			if(initializeErrorInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) {
1418 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
1419 				ABORT_FINALIZE(RS_RET_ERR);
1420 			}
1421 
1422 		} else if(pData->errorOnly) {
1423 			if(initializeErrorOnlyConext(pWrkrData, &ctx) != RS_RET_OK) {
1424 
1425 				DBGPRINTF("omelasticsearch: error initializing error only context.\n");
1426 				ABORT_FINALIZE(RS_RET_ERR);
1427 			}
1428 		} else if(pData->interleaved) {
1429 			if(initializeInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) {
1430 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
1431 				ABORT_FINALIZE(RS_RET_ERR);
1432 			}
1433 		} else if(pData->retryFailures) {
1434 			if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
1435 				DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
1436 				ABORT_FINALIZE(RS_RET_ERR);
1437 			}
1438 		} else {
1439 			DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
1440 			ABORT_FINALIZE(RS_RET_ERR);
1441 		}
1442 
1443 		/*execute context*/
1444 		if(parseRequestAndResponseForContext(pWrkrData, pReplyRoot, reqmsg, &ctx)!= RS_RET_OK) {
1445 			DBGPRINTF("omelasticsearch: error creating file content.\n");
1446 			ABORT_FINALIZE(RS_RET_ERR);
1447 		}
1448 		CHKmalloc(rendered = strdup((char*)fjson_object_to_json_string(ctx.errRoot)));
1449 	}
1450 
1451 
1452 	if(pData->fdErrFile == -1) {
1453 		pData->fdErrFile = open((char*)pData->errorFile,
1454 					O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
1455 					S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
1456 		if(pData->fdErrFile == -1) {
1457 			LogError(errno, RS_RET_ERR, "omelasticsearch: error opening error file %s",
1458 				pData->errorFile);
1459 			ABORT_FINALIZE(RS_RET_ERR);
1460 		}
1461 	}
1462 
1463 	/* we do not do real error-handling on the err file, as this finally complicates
1464 	 * things way to much.
1465 	 */
1466 	DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered);
1467 	toWrite = strlen(rendered) + 1;
1468 	/* Note: we overwrite the '\0' terminator with '\n' -- so we avoid
1469 	 * caling malloc() -- write() does NOT need '\0'!
1470 	 */
1471 	rendered[toWrite-1] = '\n'; /* NO LONGER A STRING! */
1472 	wrRet = write(pData->fdErrFile, rendered, toWrite);
1473 	if(wrRet != (ssize_t) toWrite) {
1474 		LogError(errno, RS_RET_IO_ERROR,
1475 			"omelasticsearch: error writing error file %s, write returned %lld",
1476 			pData->errorFile, (long long) wrRet);
1477 	}
1478 
1479 finalize_it:
1480 	if(bMutLocked)
1481 		pthread_mutex_unlock(&pData->mutErrFile);
1482 	free(rendered);
1483 	fjson_object_put(ctx.errRoot);
1484 	if (ctx.jTokener)
1485 		json_tokener_free(ctx.jTokener);
1486 	RETiRet;
1487 }
1488 
1489 
1490 static rsRetVal
checkResultBulkmode(wrkrInstanceData_t * pWrkrData,fjson_object * root,uchar * reqmsg)1491 checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
1492 {
1493 	DEFiRet;
1494 	context ctx;
1495 	ctx.errRoot = 0;
1496 	ctx.writeOperation = pWrkrData->pData->writeOperation;
1497 	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
1498 	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
1499 	ctx.statusCheckOnly=1;
1500 	ctx.jTokener = NULL;
1501 	if (pWrkrData->pData->retryFailures) {
1502 		ctx.statusCheckOnly=0;
1503 		CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
1504 	}
1505 	if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
1506 		DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
1507 		ABORT_FINALIZE(RS_RET_DATAFAIL);
1508 	}
1509 
1510 finalize_it:
1511 	fjson_object_put(ctx.errRoot);
1512 	if (ctx.jTokener)
1513 		json_tokener_free(ctx.jTokener);
1514 	RETiRet;
1515 }
1516 
1517 
1518 static rsRetVal
checkResult(wrkrInstanceData_t * pWrkrData,uchar * reqmsg)1519 checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
1520 {
1521 	fjson_object *root;
1522 	fjson_object *status;
1523 	DEFiRet;
1524 
1525 	root = fjson_tokener_parse(pWrkrData->reply);
1526 	if(root == NULL) {
1527 		LogMsg(0, RS_RET_ERR, LOG_WARNING,
1528 			"omelasticsearch: could not parse JSON result");
1529 		ABORT_FINALIZE(RS_RET_ERR);
1530 	}
1531 
1532 	if(pWrkrData->pData->bulkmode) {
1533 		iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
1534 	} else {
1535 		if(fjson_object_object_get_ex(root, "status", &status)) {
1536 			iRet = RS_RET_DATAFAIL;
1537 		}
1538 	}
1539 
1540 	/* Note: we ignore errors writing the error file, as we cannot handle
1541 	 * these in any case.
1542 	 */
1543 	if(iRet == RS_RET_DATAFAIL) {
1544 		STATSCOUNTER_INC(indexESFail, mutIndexESFail);
1545 		writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg);
1546 		iRet = RS_RET_OK; /* we have handled the problem! */
1547 	}
1548 
1549 finalize_it:
1550 	if(root != NULL)
1551 		fjson_object_put(root);
1552 	if(iRet != RS_RET_OK) {
1553 		STATSCOUNTER_INC(indexESFail, mutIndexESFail);
1554 	}
1555 	RETiRet;
1556 }
1557 
ATTR_NONNULL()1558 static void ATTR_NONNULL()
1559 initializeBatch(wrkrInstanceData_t *pWrkrData)
1560 {
1561 	es_emptyStr(pWrkrData->batch.data);
1562 	pWrkrData->batch.nmemb = 0;
1563 }
1564 
1565 static rsRetVal ATTR_NONNULL(1, 2)
curlPost(wrkrInstanceData_t * pWrkrData,uchar * message,int msglen,uchar ** tpls,const int nmsgs)1566 curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, const int nmsgs)
1567 {
1568 	CURLcode code;
1569 	CURL *const curl = pWrkrData->curlPostHandle;
1570 	char errbuf[CURL_ERROR_SIZE] = "";
1571 	DEFiRet;
1572 
1573 	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1574 
1575 	if ((pWrkrData->pData->rebindInterval > -1) &&
1576 		(pWrkrData->nOperations > pWrkrData->pData->rebindInterval)) {
1577 		curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1);
1578 		pWrkrData->nOperations = 0;
1579 		STATSCOUNTER_INC(rebinds, mutRebinds);
1580 	} else {
1581 		/* by default, reuse existing connections */
1582 		curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 0);
1583 	}
1584 	if ((pWrkrData->pData->rebindInterval > -1) &&
1585 		(pWrkrData->nOperations == pWrkrData->pData->rebindInterval)) {
1586 		curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1);
1587 	} else {
1588 		curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 0);
1589 	}
1590 
1591 	if(pWrkrData->pData->numServers > 1) {
1592 		/* needs to be called to support ES HA feature */
1593 		CHKiRet(checkConn(pWrkrData));
1594 	}
1595 	pWrkrData->replyLen = 0;
1596 	CHKiRet(setPostURL(pWrkrData, tpls));
1597 
1598 	curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
1599 	curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
1600 	curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf);
1601 	code = curl_easy_perform(curl);
1602 	DBGPRINTF("curl returned %lld\n", (long long) code);
1603 	if (code != CURLE_OK && code != CURLE_HTTP_RETURNED_ERROR) {
1604 		STATSCOUNTER_INC(indexHTTPReqFail, mutIndexHTTPReqFail);
1605 		indexHTTPFail += nmsgs;
1606 		LogError(0, RS_RET_SUSPENDED,
1607 			"omelasticsearch: we are suspending ourselfs due "
1608 			"to server failure %lld: %s", (long long) code, errbuf);
1609 		ABORT_FINALIZE(RS_RET_SUSPENDED);
1610 	}
1611 
1612 	if (pWrkrData->pData->rebindInterval > -1)
1613 		pWrkrData->nOperations++;
1614 
1615 	if(pWrkrData->reply == NULL) {
1616 		DBGPRINTF("omelasticsearch: pWrkrData reply==NULL, replyLen = '%d'\n",
1617 			pWrkrData->replyLen);
1618 	} else {
1619 		DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen);
1620 		if(pWrkrData->replyLen > 0) {
1621 			pWrkrData->reply[pWrkrData->replyLen] = '\0';
1622 			/* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
1623 		}
1624 		DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply);
1625 		CHKiRet(checkResult(pWrkrData, message));
1626 	}
1627 
1628 finalize_it:
1629 	incrementServerIndex(pWrkrData);
1630 	RETiRet;
1631 }
1632 
1633 static rsRetVal
submitBatch(wrkrInstanceData_t * pWrkrData)1634 submitBatch(wrkrInstanceData_t *pWrkrData)
1635 {
1636 	char *cstr = NULL;
1637 	DEFiRet;
1638 
1639 	cstr = es_str2cstr(pWrkrData->batch.data, NULL);
1640 	dbgprintf("omelasticsearch: submitBatch, batch: '%s'\n", cstr);
1641 
1642 	CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb));
1643 
1644 finalize_it:
1645 	free(cstr);
1646 	RETiRet;
1647 }
1648 
1649 BEGINbeginTransaction
1650 CODESTARTbeginTransaction
1651 	if(!pWrkrData->pData->bulkmode) {
1652 		FINALIZE;
1653 	}
1654 
1655 	initializeBatch(pWrkrData);
1656 finalize_it:
1657 ENDbeginTransaction
1658 
1659 BEGINdoAction
1660 CODESTARTdoAction
1661 	STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
1662 
1663 	if(pWrkrData->pData->bulkmode) {
1664 		const size_t nBytes = computeMessageSize(pWrkrData, ppString[0], ppString);
1665 
1666 		/* If max bytes is set and this next message will put us over the limit,
1667 		* submit the current buffer and reset */
1668 		if(pWrkrData->pData->maxbytes > 0
1669 			&& es_strlen(pWrkrData->batch.data) + nBytes > pWrkrData->pData->maxbytes ) {
1670 			dbgprintf("omelasticsearch: maxbytes limit reached, submitting partial "
1671 			"batch of %d elements.\n", pWrkrData->batch.nmemb);
1672 			CHKiRet(submitBatch(pWrkrData));
1673 			initializeBatch(pWrkrData);
1674 		}
1675 		CHKiRet(buildBatch(pWrkrData, ppString[0], ppString));
1676 
1677 		/* If there is only one item in the batch, all previous items have been
1678 	 	 * submitted or this is the first item for this transaction. Return previous
1679 		 * committed so that all items leading up to the current (exclusive)
1680 		 * are not replayed should a failure occur anywhere else in the transaction. */
1681 		iRet = pWrkrData->batch.nmemb == 1 ? RS_RET_PREVIOUS_COMMITTED : RS_RET_DEFER_COMMIT;
1682 	} else {
1683 		CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]),
1684 		                 ppString, 1));
1685 	}
1686 finalize_it:
1687 ENDdoAction
1688 
1689 
1690 BEGINendTransaction
1691 CODESTARTendTransaction
1692 	/* End Transaction only if batch data is not empty */
1693 	if (pWrkrData->batch.data != NULL && pWrkrData->batch.nmemb > 0) {
1694 		CHKiRet(submitBatch(pWrkrData));
1695 	} else {
1696 		dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, "
1697 			"nothing to send. \n");
1698 	}
1699 finalize_it:
1700 ENDendTransaction
1701 
1702 static rsRetVal
computeAuthHeader(char * uid,char * pwd,uchar ** authBuf)1703 computeAuthHeader(char* uid, char* pwd, uchar** authBuf) {
1704 	int r;
1705 	DEFiRet;
1706 
1707 	es_str_t* auth = es_newStr(1024);
1708 	if (auth == NULL) {
1709 		LogError(0, RS_RET_OUT_OF_MEMORY,
1710 			"omelasticsearch: failed to allocate es_str auth for auth header construction");
1711 		ABORT_FINALIZE(RS_RET_ERR);
1712 	}
1713 
1714 	r = es_addBuf(&auth, uid, strlen(uid));
1715 	if(r == 0) r = es_addChar(&auth, ':');
1716 	if(r == 0 && pwd != NULL) r = es_addBuf(&auth, pwd, strlen(pwd));
1717 	if(r == 0) *authBuf = (uchar*) es_str2cstr(auth, NULL);
1718 
1719 	if (r != 0 || *authBuf == NULL) {
1720 		LogError(0, RS_RET_ERR, "omelasticsearch: failed to build auth header\n");
1721 		ABORT_FINALIZE(RS_RET_ERR);
1722 	}
1723 
1724 finalize_it:
1725 	if (auth != NULL)
1726 		es_deleteStr(auth);
1727 	RETiRet;
1728 }
1729 
ATTR_NONNULL()1730 static void ATTR_NONNULL()
1731 curlSetupCommon(wrkrInstanceData_t *const pWrkrData, CURL *const handle)
1732 {
1733 	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1734 	curl_easy_setopt(handle, CURLOPT_HTTPHEADER, pWrkrData->curlHeader);
1735 	curl_easy_setopt(handle, CURLOPT_NOSIGNAL, TRUE);
1736 	curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
1737 	curl_easy_setopt(handle, CURLOPT_WRITEDATA, pWrkrData);
1738 	if(pWrkrData->pData->allowUnsignedCerts)
1739 		curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, FALSE);
1740 	if(pWrkrData->pData->skipVerifyHost)
1741 		curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, FALSE);
1742 	if(pWrkrData->pData->authBuf != NULL) {
1743 		curl_easy_setopt(handle, CURLOPT_USERPWD, pWrkrData->pData->authBuf);
1744 		curl_easy_setopt(handle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
1745 	}
1746 	if(pWrkrData->pData->caCertFile)
1747 		curl_easy_setopt(handle, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
1748 	if(pWrkrData->pData->myCertFile)
1749 		curl_easy_setopt(handle, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
1750 	if(pWrkrData->pData->myPrivKeyFile)
1751 		curl_easy_setopt(handle, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
1752 	/* uncomment for in-dept debuggung:
1753 	curl_easy_setopt(handle, CURLOPT_VERBOSE, TRUE); */
1754 }
1755 
ATTR_NONNULL()1756 static void ATTR_NONNULL()
1757 curlCheckConnSetup(wrkrInstanceData_t *const pWrkrData)
1758 {
1759 	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1760 	curlSetupCommon(pWrkrData, pWrkrData->curlCheckConnHandle);
1761 	curl_easy_setopt(pWrkrData->curlCheckConnHandle,
1762 		CURLOPT_TIMEOUT_MS, pWrkrData->pData->healthCheckTimeout);
1763 }
1764 
1765 static void ATTR_NONNULL(1)
curlPostSetup(wrkrInstanceData_t * const pWrkrData)1766 curlPostSetup(wrkrInstanceData_t *const pWrkrData)
1767 {
1768 	PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1769 	curlSetupCommon(pWrkrData, pWrkrData->curlPostHandle);
1770 	curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_POST, 1);
1771 }
1772 
1773 #define CONTENT_JSON "Content-Type: application/json; charset=utf-8"
1774 
ATTR_NONNULL()1775 static rsRetVal ATTR_NONNULL()
1776 curlSetup(wrkrInstanceData_t *const pWrkrData)
1777 {
1778 	DEFiRet;
1779 	pWrkrData->curlHeader = curl_slist_append(NULL, CONTENT_JSON);
1780 	CHKmalloc(pWrkrData->curlPostHandle = curl_easy_init());;
1781 	curlPostSetup(pWrkrData);
1782 
1783 	CHKmalloc(pWrkrData->curlCheckConnHandle = curl_easy_init());
1784 	curlCheckConnSetup(pWrkrData);
1785 
1786 finalize_it:
1787 	if(iRet != RS_RET_OK && pWrkrData->curlPostHandle != NULL) {
1788 		curl_easy_cleanup(pWrkrData->curlPostHandle);
1789 		pWrkrData->curlPostHandle = NULL;
1790 	}
1791 	RETiRet;
1792 }
1793 
ATTR_NONNULL()1794 static void ATTR_NONNULL()
1795 setInstParamDefaults(instanceData *const pData)
1796 {
1797 	pData->serverBaseUrls = NULL;
1798 	pData->defaultPort = 9200;
1799 	pData->healthCheckTimeout = 3500;
1800 	pData->uid = NULL;
1801 	pData->pwd = NULL;
1802 	pData->authBuf = NULL;
1803 	pData->searchIndex = NULL;
1804 	pData->searchType = NULL;
1805 	pData->pipelineName = NULL;
1806 	pData->dynPipelineName = 0;
1807 	pData->skipPipelineIfEmpty = 0;
1808 	pData->parent = NULL;
1809 	pData->timeout = NULL;
1810 	pData->dynSrchIdx = 0;
1811 	pData->dynSrchType = 0;
1812 	pData->dynParent = 0;
1813 	pData->useHttps = 0;
1814 	pData->bulkmode = 0;
1815 	pData->maxbytes = 104857600; //100 MB Is the default max message size that ships with ElasticSearch
1816 	pData->allowUnsignedCerts = 0;
1817 	pData->skipVerifyHost = 0;
1818 	pData->tplName = NULL;
1819 	pData->errorFile = NULL;
1820 	pData->errorOnly=0;
1821 	pData->interleaved=0;
1822 	pData->dynBulkId= 0;
1823 	pData->bulkId = NULL;
1824 	pData->caCertFile = NULL;
1825 	pData->myCertFile = NULL;
1826 	pData->myPrivKeyFile = NULL;
1827 	pData->writeOperation = ES_WRITE_INDEX;
1828 	pData->retryFailures = 0;
1829 	pData->ratelimitBurst = 20000;
1830 	pData->ratelimitInterval = 600;
1831 	pData->ratelimiter = NULL;
1832 	pData->retryRulesetName = NULL;
1833 	pData->retryRuleset = NULL;
1834 	pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
1835 }
1836 
1837 BEGINnewActInst
1838 	struct cnfparamvals *pvals;
1839 	char* serverParam = NULL;
1840 	struct cnfarray* servers = NULL;
1841 	int i;
1842 	int iNumTpls;
1843 	FILE *fp;
1844 	char errStr[1024];
1845 CODESTARTnewActInst
1846 	if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
1847 		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
1848 	}
1849 
1850 	CHKiRet(createInstance(&pData));
1851 	setInstParamDefaults(pData);
1852 
1853 	for(i = 0 ; i < actpblk.nParams ; ++i) {
1854 		if(!pvals[i].bUsed)
1855 			continue;
1856 		if(!strcmp(actpblk.descr[i].name, "server")) {
1857 			servers = pvals[i].val.d.ar;
1858 		} else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
1859 			pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1860 		} else if(!strcmp(actpblk.descr[i].name, "erroronly")) {
1861 			pData->errorOnly = pvals[i].val.d.n;
1862 		} else if(!strcmp(actpblk.descr[i].name, "interleaved")) {
1863 			pData->interleaved = pvals[i].val.d.n;
1864 		} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
1865 			pData->defaultPort = (int) pvals[i].val.d.n;
1866 		} else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) {
1867 			pData->healthCheckTimeout = (long) pvals[i].val.d.n;
1868 		} else if(!strcmp(actpblk.descr[i].name, "uid")) {
1869 			pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1870 		} else if(!strcmp(actpblk.descr[i].name, "pwd")) {
1871 			pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1872 		} else if(!strcmp(actpblk.descr[i].name, "searchindex")) {
1873 			pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1874 		} else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
1875 			pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1876 		} else if(!strcmp(actpblk.descr[i].name, "pipelinename")) {
1877 			pData->pipelineName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1878 		} else if(!strcmp(actpblk.descr[i].name, "dynpipelinename")) {
1879 			pData->dynPipelineName = pvals[i].val.d.n;
1880 		} else if(!strcmp(actpblk.descr[i].name, "skippipelineifempty")) {
1881 			pData->skipPipelineIfEmpty = pvals[i].val.d.n;
1882 		} else if(!strcmp(actpblk.descr[i].name, "parent")) {
1883 			pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1884 		} else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
1885 			pData->dynSrchIdx = pvals[i].val.d.n;
1886 		} else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
1887 			pData->dynSrchType = pvals[i].val.d.n;
1888 		} else if(!strcmp(actpblk.descr[i].name, "dynparent")) {
1889 			pData->dynParent = pvals[i].val.d.n;
1890 		} else if(!strcmp(actpblk.descr[i].name, "bulkmode")) {
1891 			pData->bulkmode = pvals[i].val.d.n;
1892 		} else if(!strcmp(actpblk.descr[i].name, "maxbytes")) {
1893 			pData->maxbytes = (size_t) pvals[i].val.d.n;
1894 		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
1895 			pData->allowUnsignedCerts = pvals[i].val.d.n;
1896 		} else if(!strcmp(actpblk.descr[i].name, "skipverifyhost")) {
1897 			pData->skipVerifyHost = pvals[i].val.d.n;
1898 		} else if(!strcmp(actpblk.descr[i].name, "timeout")) {
1899 			pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1900 		} else if(!strcmp(actpblk.descr[i].name, "usehttps")) {
1901 			pData->useHttps = pvals[i].val.d.n;
1902 		} else if(!strcmp(actpblk.descr[i].name, "template")) {
1903 			pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1904 		} else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) {
1905 			pData->dynBulkId = pvals[i].val.d.n;
1906 		} else if(!strcmp(actpblk.descr[i].name, "bulkid")) {
1907 			pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1908 		} else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
1909 			pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1910 			fp = fopen((const char*)pData->caCertFile, "r");
1911 			if(fp == NULL) {
1912 				rs_strerror_r(errno, errStr, sizeof(errStr));
1913 				LogError(0, RS_RET_NO_FILE_ACCESS,
1914 						"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
1915 						pData->caCertFile, errStr);
1916 			} else {
1917 				fclose(fp);
1918 			}
1919 		} else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
1920 			pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1921 			fp = fopen((const char*)pData->myCertFile, "r");
1922 			if(fp == NULL) {
1923 				rs_strerror_r(errno, errStr, sizeof(errStr));
1924 				LogError(0, RS_RET_NO_FILE_ACCESS,
1925 						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
1926 						pData->myCertFile, errStr);
1927 			} else {
1928 				fclose(fp);
1929 			}
1930 		} else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
1931 			pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1932 			fp = fopen((const char*)pData->myPrivKeyFile, "r");
1933 			if(fp == NULL) {
1934 				rs_strerror_r(errno, errStr, sizeof(errStr));
1935 				LogError(0, RS_RET_NO_FILE_ACCESS,
1936 						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
1937 						pData->myPrivKeyFile, errStr);
1938 			} else {
1939 				fclose(fp);
1940 			}
1941 		} else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
1942 			char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
1943 			if (writeop && !strcmp(writeop, "create")) {
1944 				pData->writeOperation = ES_WRITE_CREATE;
1945 			} else if (writeop && !strcmp(writeop, "index")) {
1946 				pData->writeOperation = ES_WRITE_INDEX;
1947 			} else if (writeop) {
1948 				LogError(0, RS_RET_CONFIG_ERROR,
1949 					"omelasticsearch: invalid value '%s' for writeoperation: "
1950 					"must be one of 'index' or 'create' - using default value 'index'", writeop);
1951 				pData->writeOperation = ES_WRITE_INDEX;
1952 			}
1953 			free(writeop);
1954 		} else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
1955 			pData->retryFailures = pvals[i].val.d.n;
1956 		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
1957 			pData->ratelimitBurst = (unsigned int) pvals[i].val.d.n;
1958 		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
1959 			pData->ratelimitInterval = (unsigned int) pvals[i].val.d.n;
1960 		} else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
1961 			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1962 		} else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
1963 			pData->rebindInterval = (int) pvals[i].val.d.n;
1964 		} else {
1965 			LogError(0, RS_RET_INTERNAL_ERROR, "omelasticsearch: program error, "
1966 				"non-handled param '%s'", actpblk.descr[i].name);
1967 		}
1968 	}
1969 
1970 	if(pData->pwd != NULL && pData->uid == NULL) {
1971 		LogError(0, RS_RET_UID_MISSING,
1972 			"omelasticsearch: password is provided, but no uid "
1973 			"- action definition invalid");
1974 		ABORT_FINALIZE(RS_RET_UID_MISSING);
1975 	}
1976 	if(pData->dynSrchIdx && pData->searchIndex == NULL) {
1977 		LogError(0, RS_RET_CONFIG_ERROR,
1978 			"omelasticsearch: requested dynamic search index, but no "
1979 			"name for index template given - action definition invalid");
1980 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1981 	}
1982 	if(pData->dynSrchType && pData->searchType == NULL) {
1983 		LogError(0, RS_RET_CONFIG_ERROR,
1984 			"omelasticsearch: requested dynamic search type, but no "
1985 			"name for type template given - action definition invalid");
1986 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1987 	}
1988 	if(pData->dynParent && pData->parent == NULL) {
1989 		LogError(0, RS_RET_CONFIG_ERROR,
1990 			"omelasticsearch: requested dynamic parent, but no "
1991 			"name for parent template given - action definition invalid");
1992 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1993 	}
1994 	if(pData->dynBulkId && pData->bulkId == NULL) {
1995 		LogError(0, RS_RET_CONFIG_ERROR,
1996 			"omelasticsearch: requested dynamic bulkid, but no "
1997 			"name for bulkid template given - action definition invalid");
1998 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1999 	}
2000 	if(pData->dynPipelineName && pData->pipelineName == NULL) {
2001 		LogError(0, RS_RET_CONFIG_ERROR,
2002 			"omelasticsearch: requested dynamic pipeline name, but no "
2003 			"name for pipelineName template given - action definition invalid");
2004 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
2005 	}
2006 
2007 	if (pData->uid != NULL)
2008 		CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf));
2009 
2010 	iNumTpls = 1;
2011 	if(pData->dynSrchIdx) ++iNumTpls;
2012 	if(pData->dynSrchType) ++iNumTpls;
2013 	if(pData->dynParent) ++iNumTpls;
2014 	if(pData->dynBulkId) ++iNumTpls;
2015 	if(pData->dynPipelineName) ++iNumTpls;
2016 	DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
2017 	CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
2018 
2019 	CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
2020 					    " StdJSONFmt" : (char*)pData->tplName),
2021 		OMSR_NO_RQD_TPL_OPTS));
2022 
2023 
2024 	/* we need to request additional templates. If we have a dynamic search index,
2025 	 * it will always be string 1. Type may be 1 or 2, depending on whether search
2026 	 * index is dynamic as well. Rule needs to be followed throughout the module.
2027 	 */
2028 	iNumTpls = 1;
2029 	if(pData->dynSrchIdx) {
2030 		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchIndex),
2031 			OMSR_NO_RQD_TPL_OPTS));
2032 		++iNumTpls;
2033 	}
2034 	if(pData->dynSrchType) {
2035 		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchType),
2036 			OMSR_NO_RQD_TPL_OPTS));
2037 		++iNumTpls;
2038 	}
2039 	if(pData->dynParent) {
2040 		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->parent),
2041 			OMSR_NO_RQD_TPL_OPTS));
2042 		++iNumTpls;
2043 	}
2044 	if(pData->dynBulkId) {
2045 		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->bulkId),
2046 			OMSR_NO_RQD_TPL_OPTS));
2047 		++iNumTpls;
2048 	}
2049 	if(pData->dynPipelineName) {
2050 		CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->pipelineName),
2051 			OMSR_NO_RQD_TPL_OPTS));
2052 		++iNumTpls;
2053 	}
2054 
2055 
2056 	if (servers != NULL) {
2057 		pData->numServers = servers->nmemb;
2058 		pData->serverBaseUrls = malloc(servers->nmemb * sizeof(uchar*));
2059 		if (pData->serverBaseUrls == NULL) {
2060 			LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2061 					"for ElasticSearch server configuration.");
2062 			ABORT_FINALIZE(RS_RET_ERR);
2063 		}
2064 
2065 		for(i = 0 ; i < servers->nmemb ; ++i) {
2066 			serverParam = es_str2cstr(servers->arr[i], NULL);
2067 			if (serverParam == NULL) {
2068 				LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2069 					"for ElasticSearch server configuration.");
2070 				ABORT_FINALIZE(RS_RET_ERR);
2071 			}
2072 			/* Remove a trailing slash if it exists */
2073 			const size_t serverParamLastChar = strlen(serverParam)-1;
2074 			if (serverParam[serverParamLastChar] == '/') {
2075 				serverParam[serverParamLastChar] = '\0';
2076 			}
2077 			CHKiRet(computeBaseUrl(serverParam, pData->defaultPort, pData->useHttps,
2078 				pData->serverBaseUrls + i));
2079 			free(serverParam);
2080 			serverParam = NULL;
2081 		}
2082 	} else {
2083 		LogMsg(0, RS_RET_OK, LOG_WARNING,
2084 			"omelasticsearch: No servers specified, using localhost");
2085 		pData->numServers = 1;
2086 		pData->serverBaseUrls = malloc(sizeof(uchar*));
2087 		if (pData->serverBaseUrls == NULL) {
2088 			LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2089 					"for ElasticSearch server configuration.");
2090 			ABORT_FINALIZE(RS_RET_ERR);
2091 		}
2092 		CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
2093 	}
2094 
2095 	if(pData->searchIndex == NULL)
2096 		pData->searchIndex = (uchar*) strdup("system");
2097 	if(pData->searchType == NULL)
2098 		pData->searchType = (uchar*) strdup("events");
2099 
2100 	if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
2101 		LogError(0, RS_RET_CONFIG_ERROR,
2102 			"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
2103 		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
2104 	}
2105 
2106 	if (pData->retryFailures) {
2107 		CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
2108 		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
2109 		ratelimitSetNoTimeCache(pData->ratelimiter);
2110 	}
2111 
2112 	/* node created, let's add to list of instance configs for the module */
2113 	if(loadModConf->tail == NULL) {
2114 		loadModConf->tail = loadModConf->root = pData;
2115 	} else {
2116 		loadModConf->tail->next = pData;
2117 		loadModConf->tail = pData;
2118 	}
2119 
2120 CODE_STD_FINALIZERnewActInst
2121 	cnfparamvalsDestruct(pvals, &actpblk);
2122 	if (serverParam)
2123 		free(serverParam);
2124 ENDnewActInst
2125 
2126 
2127 BEGINbeginCnfLoad
2128 CODESTARTbeginCnfLoad
2129 	loadModConf = pModConf;
2130 	pModConf->pConf = pConf;
2131 	pModConf->root = pModConf->tail = NULL;
2132 ENDbeginCnfLoad
2133 
2134 
2135 BEGINendCnfLoad
2136 CODESTARTendCnfLoad
2137 	loadModConf = NULL; /* done loading */
2138 ENDendCnfLoad
2139 
2140 
2141 BEGINcheckCnf
2142 	instanceConf_t *inst;
2143 CODESTARTcheckCnf
2144 	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
2145 		ruleset_t *pRuleset;
2146 		rsRetVal localRet;
2147 
2148 		if (inst->retryRulesetName) {
2149 			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
2150 			if(localRet == RS_RET_NOT_FOUND) {
2151 				LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
2152 						"no retry ruleset will be used", inst->retryRulesetName);
2153 			} else {
2154 				inst->retryRuleset = pRuleset;
2155 			}
2156 		}
2157 	}
2158 ENDcheckCnf
2159 
2160 
2161 BEGINactivateCnf
2162 CODESTARTactivateCnf
2163 ENDactivateCnf
2164 
2165 
2166 BEGINfreeCnf
2167 CODESTARTfreeCnf
2168 ENDfreeCnf
2169 
2170 
2171 BEGINdoHUP
2172 CODESTARTdoHUP
2173 	pthread_mutex_lock(&pData->mutErrFile);
2174 	if(pData->fdErrFile != -1) {
2175 		close(pData->fdErrFile);
2176 		pData->fdErrFile = -1;
2177 	}
2178 	pthread_mutex_unlock(&pData->mutErrFile);
2179 ENDdoHUP
2180 
2181 
2182 BEGINmodExit
2183 CODESTARTmodExit
2184 	if(pInputName != NULL)
2185 		prop.Destruct(&pInputName);
2186 	curl_global_cleanup();
2187 	statsobj.Destruct(&indexStats);
2188 	objRelease(statsobj, CORE_COMPONENT);
2189 	objRelease(prop, CORE_COMPONENT);
2190 	objRelease(ruleset, CORE_COMPONENT);
2191 ENDmodExit
2192 
2193 NO_LEGACY_CONF_parseSelectorAct
2194 
2195 BEGINqueryEtryPt
2196 CODESTARTqueryEtryPt
2197 CODEqueryEtryPt_STD_OMOD_QUERIES
2198 CODEqueryEtryPt_STD_OMOD8_QUERIES
2199 CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
2200 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
2201 CODEqueryEtryPt_doHUP
2202 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
2203 CODEqueryEtryPt_STD_CONF2_QUERIES
2204 ENDqueryEtryPt
2205 
2206 
2207 BEGINmodInit()
2208 CODESTARTmodInit
2209 	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
2210 CODEmodInit_QueryRegCFSLineHdlr
2211 	CHKiRet(objUse(statsobj, CORE_COMPONENT));
2212 	CHKiRet(objUse(prop, CORE_COMPONENT));
2213 	CHKiRet(objUse(ruleset, CORE_COMPONENT));
2214 
2215 	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
2216 		LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
2217 		ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
2218 	}
2219 
2220 	/* support statistics gathering */
2221 	CHKiRet(statsobj.Construct(&indexStats));
2222 	CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch"));
2223 	CHKiRet(statsobj.SetOrigin(indexStats, (uchar *)"omelasticsearch"));
2224 	STATSCOUNTER_INIT(indexSubmit, mutIndexSubmit);
2225 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted",
2226 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSubmit));
2227 	STATSCOUNTER_INIT(indexHTTPFail, mutIndexHTTPFail);
2228 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http",
2229 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPFail));
2230 	STATSCOUNTER_INIT(indexHTTPReqFail, mutIndexHTTPReqFail);
2231 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests",
2232 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPReqFail));
2233 	STATSCOUNTER_INIT(checkConnFail, mutCheckConnFail);
2234 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.checkConn",
2235 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &checkConnFail));
2236 	STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
2237 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
2238 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
2239 	STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
2240 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
2241 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
2242 	STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
2243 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
2244 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
2245 	STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
2246 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
2247 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
2248 	STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
2249 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
2250 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
2251 	STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
2252 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
2253 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
2254 	STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
2255 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
2256 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
2257 	STATSCOUNTER_INIT(rebinds, mutRebinds);
2258 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"rebinds",
2259 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &rebinds));
2260 	CHKiRet(statsobj.ConstructFinalize(indexStats));
2261 	CHKiRet(prop.Construct(&pInputName));
2262 	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
2263 	CHKiRet(prop.ConstructFinalize(pInputName));
2264 ENDmodInit
2265 
2266 /* vi:set ai:
2267  */
2268