1 /* omelasticsearch.c
2 * This is the http://www.elasticsearch.org/ output module.
3 *
4 * NOTE: read comments in module-template.h for more specifics!
5 *
6 * Copyright 2011 Nathan Scott.
7 * Copyright 2009-2021 Rainer Gerhards and Adiscon GmbH.
8 *
9 * This file is part of rsyslog.
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 * -or-
17 * see COPYING.ASL20 in the source distribution
18 *
19 * Unless required by applicable law or agreed to in writing, software
20 * distributed under the License is distributed on an "AS IS" BASIS,
21 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 * See the License for the specific language governing permissions and
23 * limitations under the License.
24 */
25 #include "config.h"
26 #include "rsyslog.h"
27 #include <stdio.h>
28 #include <stdarg.h>
29 #include <stdlib.h>
30 #include <memory.h>
31 #include <string.h>
32 #include <curl/curl.h>
33 #include <curl/easy.h>
34 #include <assert.h>
35 #include <signal.h>
36 #include <errno.h>
37 #include <time.h>
38 #include <sys/types.h>
39 #include <sys/stat.h>
40 #include <fcntl.h>
41 #if defined(__FreeBSD__)
42 #include <unistd.h>
43 #endif
44 #include <json.h>
45 #include "conf.h"
46 #include "syslogd-types.h"
47 #include "srUtils.h"
48 #include "template.h"
49 #include "module-template.h"
50 #include "errmsg.h"
51 #include "statsobj.h"
52 #include "cfsysline.h"
53 #include "unicode-helper.h"
54 #include "obj-types.h"
55 #include "ratelimit.h"
56 #include "ruleset.h"
57
58 #ifndef O_LARGEFILE
59 # define O_LARGEFILE 0
60 #endif
61
62 MODULE_TYPE_OUTPUT
63 MODULE_TYPE_NOKEEP
64 MODULE_CNFNAME("omelasticsearch")
65
66 /* internal structures */
67 DEF_OMOD_STATIC_DATA
68 DEFobjCurrIf(statsobj)
69 DEFobjCurrIf(prop)
70 DEFobjCurrIf(ruleset)
71
72 statsobj_t *indexStats;
73 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
74 STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
75 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
76 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
77 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
78 STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
79 STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
80 STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
81 STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
82 STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
83 STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
84 STATSCOUNTER_DEF(rebinds, mutRebinds)
85
86 static prop_t *pInputName = NULL;
87
88 # define META_STRT "{\"index\":{\"_index\": \""
89 # define META_STRT_CREATE "{\"create\":{\"_index\": \""
90 # define META_TYPE "\",\"_type\":\""
91 # define META_PIPELINE "\",\"pipeline\":\""
92 # define META_PARENT "\",\"_parent\":\""
93 # define META_ID "\", \"_id\":\""
94 # define META_END "\"}}\n"
95
96 typedef enum {
97 ES_WRITE_INDEX,
98 ES_WRITE_CREATE,
99 ES_WRITE_UPDATE, /* not supported */
100 ES_WRITE_UPSERT /* not supported */
101 } es_write_ops_t;
102
103 #define WRKR_DATA_TYPE_ES 0xBADF0001
104
105 #define DEFAULT_REBIND_INTERVAL -1
106
107 /* REST API for elasticsearch hits this URL:
108 * http://<hostName>:<restPort>/<searchIndex>/<searchType>
109 */
110 /* bulk API uses /_bulk */
111 typedef struct curl_slist HEADER;
112 typedef struct instanceConf_s {
113 int defaultPort;
114 int fdErrFile; /* error file fd or -1 if not open */
115 pthread_mutex_t mutErrFile;
116 uchar **serverBaseUrls;
117 int numServers;
118 long healthCheckTimeout;
119 uchar *uid;
120 uchar *pwd;
121 uchar *authBuf;
122 uchar *searchIndex;
123 uchar *searchType;
124 uchar *pipelineName;
125 sbool skipPipelineIfEmpty;
126 uchar *parent;
127 uchar *tplName;
128 uchar *timeout;
129 uchar *bulkId;
130 uchar *errorFile;
131 sbool errorOnly;
132 sbool interleaved;
133 sbool dynSrchIdx;
134 sbool dynSrchType;
135 sbool dynParent;
136 sbool dynBulkId;
137 sbool dynPipelineName;
138 sbool bulkmode;
139 size_t maxbytes;
140 sbool useHttps;
141 sbool allowUnsignedCerts;
142 sbool skipVerifyHost;
143 uchar *caCertFile;
144 uchar *myCertFile;
145 uchar *myPrivKeyFile;
146 es_write_ops_t writeOperation;
147 sbool retryFailures;
148 unsigned int ratelimitInterval;
149 unsigned int ratelimitBurst;
150 /* for retries */
151 ratelimit_t *ratelimiter;
152 uchar *retryRulesetName;
153 ruleset_t *retryRuleset;
154 int rebindInterval;
155 struct instanceConf_s *next;
156 } instanceData;
157
158 struct modConfData_s {
159 rsconf_t *pConf; /* our overall config object */
160 instanceConf_t *root, *tail;
161 };
162 static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */
163
164 typedef struct wrkrInstanceData {
165 PTR_ASSERT_DEF
166 instanceData *pData;
167 int serverIndex;
168 int replyLen;
169 size_t replyBufLen;
170 char *reply;
171 CURL *curlCheckConnHandle; /* libcurl session handle for checking the server connection */
172 CURL *curlPostHandle; /* libcurl session handle for posting data to the server */
173 HEADER *curlHeader; /* json POST request info */
174 uchar *restURL; /* last used URL for error reporting */
175 struct {
176 es_str_t *data;
177 int nmemb; /* number of messages in batch (for statistics counting) */
178 uchar *currTpl1;
179 uchar *currTpl2;
180 } batch;
181 int nOperations; /* counter used with rebindInterval */
182 } wrkrInstanceData_t;
183
184 /* tables for interfacing with the v6 config system */
185 /* action (instance) parameters */
186 static struct cnfparamdescr actpdescr[] = {
187 { "server", eCmdHdlrArray, 0 },
188 { "serverport", eCmdHdlrInt, 0 },
189 { "healthchecktimeout", eCmdHdlrInt, 0 },
190 { "uid", eCmdHdlrGetWord, 0 },
191 { "pwd", eCmdHdlrGetWord, 0 },
192 { "searchindex", eCmdHdlrGetWord, 0 },
193 { "searchtype", eCmdHdlrGetWord, 0 },
194 { "pipelinename", eCmdHdlrGetWord, 0 },
195 { "skippipelineifempty", eCmdHdlrBinary, 0 },
196 { "parent", eCmdHdlrGetWord, 0 },
197 { "dynsearchindex", eCmdHdlrBinary, 0 },
198 { "dynsearchtype", eCmdHdlrBinary, 0 },
199 { "dynparent", eCmdHdlrBinary, 0 },
200 { "bulkmode", eCmdHdlrBinary, 0 },
201 { "maxbytes", eCmdHdlrSize, 0 },
202 { "asyncrepl", eCmdHdlrGoneAway, 0 },
203 { "usehttps", eCmdHdlrBinary, 0 },
204 { "timeout", eCmdHdlrGetWord, 0 },
205 { "errorfile", eCmdHdlrGetWord, 0 },
206 { "erroronly", eCmdHdlrBinary, 0 },
207 { "interleaved", eCmdHdlrBinary, 0 },
208 { "template", eCmdHdlrGetWord, 0 },
209 { "dynbulkid", eCmdHdlrBinary, 0 },
210 { "dynpipelinename", eCmdHdlrBinary, 0 },
211 { "bulkid", eCmdHdlrGetWord, 0 },
212 { "allowunsignedcerts", eCmdHdlrBinary, 0 },
213 { "skipverifyhost", eCmdHdlrBinary, 0 },
214 { "tls.cacert", eCmdHdlrString, 0 },
215 { "tls.mycert", eCmdHdlrString, 0 },
216 { "tls.myprivkey", eCmdHdlrString, 0 },
217 { "writeoperation", eCmdHdlrGetWord, 0 },
218 { "retryfailures", eCmdHdlrBinary, 0 },
219 { "ratelimit.interval", eCmdHdlrInt, 0 },
220 { "ratelimit.burst", eCmdHdlrInt, 0 },
221 { "retryruleset", eCmdHdlrString, 0 },
222 { "rebindinterval", eCmdHdlrInt, 0 }
223 };
224 static struct cnfparamblk actpblk =
225 { CNFPARAMBLK_VERSION,
226 sizeof(actpdescr)/sizeof(struct cnfparamdescr),
227 actpdescr
228 };
229
230 static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData);
231
232 BEGINcreateInstance
233 CODESTARTcreateInstance
234 pData->fdErrFile = -1;
235 if(pthread_mutex_init(&pData->mutErrFile, NULL) != 0) {
236 LogError(errno, RS_RET_ERR, "omelasticsearch: cannot create "
237 "error file mutex, failing this action");
238 ABORT_FINALIZE(RS_RET_ERR);
239 }
240 pData->caCertFile = NULL;
241 pData->myCertFile = NULL;
242 pData->myPrivKeyFile = NULL;
243 pData->ratelimiter = NULL;
244 pData->retryRulesetName = NULL;
245 pData->retryRuleset = NULL;
246 pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
247 finalize_it:
248 ENDcreateInstance
249
250 BEGINcreateWrkrInstance
251 CODESTARTcreateWrkrInstance
252 PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
253 pWrkrData->curlHeader = NULL;
254 pWrkrData->curlPostHandle = NULL;
255 pWrkrData->curlCheckConnHandle = NULL;
256 pWrkrData->serverIndex = 0;
257 pWrkrData->restURL = NULL;
258 if(pData->bulkmode) {
259 pWrkrData->batch.currTpl1 = NULL;
260 pWrkrData->batch.currTpl2 = NULL;
261 if((pWrkrData->batch.data = es_newStr(1024)) == NULL) {
262 LogError(0, RS_RET_OUT_OF_MEMORY,
263 "omelasticsearch: error creating batch string "
264 "turned off bulk mode\n");
265 pData->bulkmode = 0; /* at least it works */
266 }
267 }
268 pWrkrData->nOperations = 0;
269 pWrkrData->reply = NULL;
270 pWrkrData->replyLen = 0;
271 pWrkrData->replyBufLen = 0;
272 iRet = curlSetup(pWrkrData);
273 ENDcreateWrkrInstance
274
275 BEGINisCompatibleWithFeature
276 CODESTARTisCompatibleWithFeature
277 if(eFeat == sFEATURERepeatedMsgReduction)
278 iRet = RS_RET_OK;
279 ENDisCompatibleWithFeature
280
281 BEGINfreeInstance
282 int i;
283 CODESTARTfreeInstance
284 if(pData->fdErrFile != -1)
285 close(pData->fdErrFile);
286
287 if(loadModConf != NULL) {
288 /* we keep our instances in our own internal list - this also
289 * means we need to cleanup that list, else we cause grief.
290 */
291 instanceData *prev = NULL;
292 for(instanceData *inst = loadModConf->root ; inst != NULL ; inst = inst->next) {
293 if(inst == pData) {
294 if(loadModConf->tail == inst) {
295 loadModConf->tail = prev;
296 }
297 prev->next = inst->next;
298 /* no need to correct inst back to prev - we exit now! */
299 break;
300 } else {
301 prev = inst;
302 }
303 }
304 }
305
306 pthread_mutex_destroy(&pData->mutErrFile);
307 for(i = 0 ; i < pData->numServers ; ++i)
308 free(pData->serverBaseUrls[i]);
309 free(pData->serverBaseUrls);
310 free(pData->uid);
311 free(pData->pwd);
312 free(pData->authBuf);
313 free(pData->searchIndex);
314 free(pData->searchType);
315 free(pData->pipelineName);
316 free(pData->parent);
317 free(pData->tplName);
318 free(pData->timeout);
319 free(pData->errorFile);
320 free(pData->bulkId);
321 free(pData->caCertFile);
322 free(pData->myCertFile);
323 free(pData->myPrivKeyFile);
324 free(pData->retryRulesetName);
325 if (pData->ratelimiter != NULL)
326 ratelimitDestruct(pData->ratelimiter);
327 ENDfreeInstance
328
329 BEGINfreeWrkrInstance
330 CODESTARTfreeWrkrInstance
331 if(pWrkrData->curlHeader != NULL) {
332 curl_slist_free_all(pWrkrData->curlHeader);
333 pWrkrData->curlHeader = NULL;
334 }
335 if(pWrkrData->curlCheckConnHandle != NULL) {
336 curl_easy_cleanup(pWrkrData->curlCheckConnHandle);
337 pWrkrData->curlCheckConnHandle = NULL;
338 }
339 if(pWrkrData->curlPostHandle != NULL) {
340 curl_easy_cleanup(pWrkrData->curlPostHandle);
341 pWrkrData->curlPostHandle = NULL;
342 }
343 if (pWrkrData->restURL != NULL) {
344 free(pWrkrData->restURL);
345 pWrkrData->restURL = NULL;
346 }
347 es_deleteStr(pWrkrData->batch.data);
348 free(pWrkrData->reply);
349 ENDfreeWrkrInstance
350
351 BEGINdbgPrintInstInfo
352 int i;
353 CODESTARTdbgPrintInstInfo
354 dbgprintf("omelasticsearch\n");
355 dbgprintf("\ttemplate='%s'\n", pData->tplName);
356 dbgprintf("\tnumServers=%d\n", pData->numServers);
357 dbgprintf("\thealthCheckTimeout=%lu\n", pData->healthCheckTimeout);
358 dbgprintf("\tserverBaseUrls=");
359 for(i = 0 ; i < pData->numServers ; ++i)
360 dbgprintf("%c'%s'", i == 0 ? '[' : ' ', pData->serverBaseUrls[i]);
361 dbgprintf("]\n");
362 dbgprintf("\tdefaultPort=%d\n", pData->defaultPort);
363 dbgprintf("\tuid='%s'\n", pData->uid == NULL ? (uchar*)"(not configured)" : pData->uid);
364 dbgprintf("\tpwd=(%sconfigured)\n", pData->pwd == NULL ? "not " : "");
365 dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
366 dbgprintf("\tsearch type='%s'\n", pData->searchType);
367 dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
368 dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
369 dbgprintf("\tskipPipelineIfEmpty=%d\n", pData->skipPipelineIfEmpty);
370 dbgprintf("\tparent='%s'\n", pData->parent);
371 dbgprintf("\ttimeout='%s'\n", pData->timeout);
372 dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx);
373 dbgprintf("\tdynamic search type=%d\n", pData->dynSrchType);
374 dbgprintf("\tdynamic parent=%d\n", pData->dynParent);
375 dbgprintf("\tuse https=%d\n", pData->useHttps);
376 dbgprintf("\tbulkmode=%d\n", pData->bulkmode);
377 dbgprintf("\tmaxbytes=%zu\n", pData->maxbytes);
378 dbgprintf("\tallowUnsignedCerts=%d\n", pData->allowUnsignedCerts);
379 dbgprintf("\tskipVerifyHost=%d\n", pData->skipVerifyHost);
380 dbgprintf("\terrorfile='%s'\n", pData->errorFile == NULL ?
381 (uchar*)"(not configured)" : pData->errorFile);
382 dbgprintf("\terroronly=%d\n", pData->errorOnly);
383 dbgprintf("\tinterleaved=%d\n", pData->interleaved);
384 dbgprintf("\tdynbulkid=%d\n", pData->dynBulkId);
385 dbgprintf("\tbulkid='%s'\n", pData->bulkId);
386 dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
387 dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
388 dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
389 dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
390 dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
391 dbgprintf("\tratelimit.interval='%u'\n", pData->ratelimitInterval);
392 dbgprintf("\tratelimit.burst='%u'\n", pData->ratelimitBurst);
393 dbgprintf("\trebindinterval='%d'\n", pData->rebindInterval);
394 ENDdbgPrintInstInfo
395
396
397 /* elasticsearch POST result string ... useful for debugging */
398 static size_t
curlResult(void * const ptr,const size_t size,const size_t nmemb,void * const userdata)399 curlResult(void *const ptr, const size_t size, const size_t nmemb, void *const userdata)
400 {
401 const char *const p = (const char *)ptr;
402 wrkrInstanceData_t *const pWrkrData = (wrkrInstanceData_t*) userdata;
403 char *buf;
404 const size_t size_add = size*nmemb;
405 size_t newlen;
406 PTR_ASSERT_CHK(pWrkrData, WRKR_DATA_TYPE_ES);
407 newlen = pWrkrData->replyLen + size_add;
408 if(newlen + 1 > pWrkrData->replyBufLen) {
409 if((buf = realloc(pWrkrData->reply, pWrkrData->replyBufLen + size_add + 1)) == NULL) {
410 LogError(errno, RS_RET_ERR, "omelasticsearch: realloc failed in curlResult");
411 return 0; /* abort due to failure */
412 }
413 pWrkrData->replyBufLen += size_add + 1;
414 pWrkrData->reply = buf;
415 }
416 memcpy(pWrkrData->reply+pWrkrData->replyLen, p, size_add);
417 pWrkrData->replyLen = newlen;
418 return size_add;
419 }
420
421 /* Build basic URL part, which includes hostname and port as follows:
422 * http://hostname:port/ based on a server param
423 * Newly creates a cstr for this purpose.
424 * Note: serverParam MUST NOT end in '/' (caller must strip if it exists)
425 */
426 static rsRetVal
computeBaseUrl(const char * const serverParam,const int defaultPort,const sbool useHttps,uchar ** baseUrl)427 computeBaseUrl(const char*const serverParam,
428 const int defaultPort,
429 const sbool useHttps,
430 uchar **baseUrl)
431 {
432 # define SCHEME_HTTPS "https://"
433 # define SCHEME_HTTP "http://"
434
435 char portBuf[64];
436 int r = 0;
437 const char *host = serverParam;
438 DEFiRet;
439
440 assert(serverParam[strlen(serverParam)-1] != '/');
441
442 es_str_t *urlBuf = es_newStr(256);
443 if (urlBuf == NULL) {
444 LogError(0, RS_RET_OUT_OF_MEMORY,
445 "omelasticsearch: failed to allocate es_str urlBuf in computeBaseUrl");
446 ABORT_FINALIZE(RS_RET_ERR);
447 }
448
449 /* Find where the hostname/ip of the server starts. If the scheme is not specified
450 in the uri, start the buffer with a scheme corresponding to the useHttps parameter.
451 */
452 if (strcasestr(serverParam, SCHEME_HTTP))
453 host = serverParam + strlen(SCHEME_HTTP);
454 else if (strcasestr(serverParam, SCHEME_HTTPS))
455 host = serverParam + strlen(SCHEME_HTTPS);
456 else
457 r = useHttps ? es_addBuf(&urlBuf, SCHEME_HTTPS, sizeof(SCHEME_HTTPS)-1) :
458 es_addBuf(&urlBuf, SCHEME_HTTP, sizeof(SCHEME_HTTP)-1);
459
460 if (r == 0) r = es_addBuf(&urlBuf, (char *)serverParam, strlen(serverParam));
461 if (r == 0 && !strchr(host, ':')) {
462 snprintf(portBuf, sizeof(portBuf), ":%d", defaultPort);
463 r = es_addBuf(&urlBuf, portBuf, strlen(portBuf));
464 }
465 if (r == 0) r = es_addChar(&urlBuf, '/');
466 if (r == 0) *baseUrl = (uchar*) es_str2cstr(urlBuf, NULL);
467
468 if (r != 0 || baseUrl == NULL) {
469 LogError(0, RS_RET_ERR,
470 "omelasticsearch: error occurred computing baseUrl from server %s", serverParam);
471 ABORT_FINALIZE(RS_RET_ERR);
472 }
473 finalize_it:
474 if (urlBuf) {
475 es_deleteStr(urlBuf);
476 }
477 RETiRet;
478 }
479
480 static inline void
incrementServerIndex(wrkrInstanceData_t * pWrkrData)481 incrementServerIndex(wrkrInstanceData_t *pWrkrData)
482 {
483 pWrkrData->serverIndex = (pWrkrData->serverIndex + 1) % pWrkrData->pData->numServers;
484 }
485
486
487 /* checks if connection to ES can be established; also iterates over
488 * potential servers to support high availability (HA) feature. If it
489 * needs to switch server, will record new one in curl handle.
490 */
ATTR_NONNULL()491 static rsRetVal ATTR_NONNULL()
492 checkConn(wrkrInstanceData_t *const pWrkrData)
493 {
494 # define HEALTH_URI "_cat/health"
495 CURL *curl;
496 CURLcode res;
497 es_str_t *urlBuf;
498 char* healthUrl = NULL;
499 char* serverUrl;
500 int i;
501 int r;
502 DEFiRet;
503
504 pWrkrData->replyLen = 0;
505 curl = pWrkrData->curlCheckConnHandle;
506 urlBuf = es_newStr(256);
507 if (urlBuf == NULL) {
508 LogError(0, RS_RET_OUT_OF_MEMORY,
509 "omelasticsearch: unable to allocate buffer for health check uri.");
510 ABORT_FINALIZE(RS_RET_SUSPENDED);
511 }
512
513 for(i = 0; i < pWrkrData->pData->numServers; ++i) {
514 serverUrl = (char*) pWrkrData->pData->serverBaseUrls[pWrkrData->serverIndex];
515
516 es_emptyStr(urlBuf);
517 r = es_addBuf(&urlBuf, serverUrl, strlen(serverUrl));
518 if(r == 0) r = es_addBuf(&urlBuf, HEALTH_URI, sizeof(HEALTH_URI)-1);
519 if(r == 0) healthUrl = es_str2cstr(urlBuf, NULL);
520 if(r != 0 || healthUrl == NULL) {
521 LogError(0, RS_RET_OUT_OF_MEMORY,
522 "omelasticsearch: unable to allocate buffer for health check uri.");
523 ABORT_FINALIZE(RS_RET_SUSPENDED);
524 }
525
526 curl_easy_setopt(curl, CURLOPT_URL, healthUrl);
527 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curlResult);
528 res = curl_easy_perform(curl);
529 free(healthUrl);
530
531 if (res == CURLE_OK) {
532 DBGPRINTF("omelasticsearch: checkConn %s completed with success "
533 "on attempt %d\n", serverUrl, i);
534 ABORT_FINALIZE(RS_RET_OK);
535 }
536
537 DBGPRINTF("omelasticsearch: checkConn %s failed on attempt %d: %s\n",
538 serverUrl, i, curl_easy_strerror(res));
539 STATSCOUNTER_INC(checkConnFail, mutCheckConnFail);
540 incrementServerIndex(pWrkrData);
541 }
542
543 LogMsg(0, RS_RET_SUSPENDED, LOG_WARNING,
544 "omelasticsearch: checkConn failed after %d attempts.", i);
545 ABORT_FINALIZE(RS_RET_SUSPENDED);
546
547 finalize_it:
548 if(urlBuf != NULL)
549 es_deleteStr(urlBuf);
550 RETiRet;
551 }
552
553
554 BEGINtryResume
555 CODESTARTtryResume
556 DBGPRINTF("omelasticsearch: tryResume called\n");
557 iRet = checkConn(pWrkrData);
558 ENDtryResume
559
560
561 /* get the current index and type for this message */
562 static void ATTR_NONNULL(1)
getIndexTypeAndParent(const instanceData * const pData,uchar ** const tpls,uchar ** const srchIndex,uchar ** const srchType,uchar ** const parent,uchar ** const bulkId,uchar ** const pipelineName)563 getIndexTypeAndParent(const instanceData *const pData, uchar **const tpls,
564 uchar **const srchIndex, uchar **const srchType, uchar **const parent,
565 uchar **const bulkId, uchar **const pipelineName)
566 {
567 *srchIndex = pData->searchIndex;
568 *parent = pData->parent;
569 *srchType = pData->searchType;
570 *bulkId = pData->bulkId;
571 *pipelineName = pData->pipelineName;
572 if(tpls == NULL) {
573 goto done;
574 }
575
576 int iNumTpls = 1;
577 if(pData->dynSrchIdx) {
578 *srchIndex = tpls[iNumTpls];
579 ++iNumTpls;
580 }
581 if(pData->dynSrchType) {
582 *srchType = tpls[iNumTpls];
583 ++iNumTpls;
584 }
585 if(pData->dynParent) {
586 *parent = tpls[iNumTpls];
587 ++iNumTpls;
588 }
589 if(pData->dynBulkId) {
590 *bulkId = tpls[iNumTpls];
591 ++iNumTpls;
592 }
593 if(pData->dynPipelineName) {
594 *pipelineName = tpls[iNumTpls];
595 ++iNumTpls;
596 }
597
598 done:
599 assert(srchIndex != NULL);
600 assert(srchType != NULL);
601 return;
602 }
603
604
605 static rsRetVal ATTR_NONNULL(1)
setPostURL(wrkrInstanceData_t * const pWrkrData,uchar ** const tpls)606 setPostURL(wrkrInstanceData_t *const pWrkrData, uchar **const tpls)
607 {
608 uchar *searchIndex = NULL;
609 uchar *searchType;
610 uchar *pipelineName;
611 uchar *parent;
612 uchar *bulkId;
613 char* baseUrl;
614 es_str_t *url;
615 int r;
616 DEFiRet;
617 instanceData *const pData = pWrkrData->pData;
618 char separator;
619 const int bulkmode = pData->bulkmode;
620
621 baseUrl = (char*)pData->serverBaseUrls[pWrkrData->serverIndex];
622 url = es_newStrFromCStr(baseUrl, strlen(baseUrl));
623 if (url == NULL) {
624 LogError(0, RS_RET_OUT_OF_MEMORY,
625 "omelasticsearch: error allocating new estr for POST url.");
626 ABORT_FINALIZE(RS_RET_ERR);
627 }
628
629 separator = '?';
630
631 if(bulkmode) {
632 r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
633 parent = NULL;
634 } else {
635 getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
636 r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
637 if(r == 0) r = es_addChar(&url, '/');
638 if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
639 if(pipelineName != NULL && (!pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
640 if(r == 0) r = es_addChar(&url, separator);
641 if(r == 0) r = es_addBuf(&url, "pipeline=", sizeof("pipeline=")-1);
642 if(r == 0) r = es_addBuf(&url, (char*)pipelineName, ustrlen(pipelineName));
643 separator = '&';
644 }
645 }
646
647 if(pData->timeout != NULL) {
648 if(r == 0) r = es_addChar(&url, separator);
649 if(r == 0) r = es_addBuf(&url, "timeout=", sizeof("timeout=")-1);
650 if(r == 0) r = es_addBuf(&url, (char*)pData->timeout, ustrlen(pData->timeout));
651 separator = '&';
652 }
653
654 if(parent != NULL) {
655 if(r == 0) r = es_addChar(&url, separator);
656 if(r == 0) r = es_addBuf(&url, "parent=", sizeof("parent=")-1);
657 if(r == 0) es_addBuf(&url, (char*)parent, ustrlen(parent));
658 }
659
660 if(pWrkrData->restURL != NULL)
661 free(pWrkrData->restURL);
662
663 pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL);
664 curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_URL, pWrkrData->restURL);
665 DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL);
666
667 finalize_it:
668 if (url != NULL)
669 es_deleteStr(url);
670 RETiRet;
671 }
672
673
674 /* this method computes the expected size of adding the next message into
675 * the batched request to elasticsearch
676 */
677 static size_t
computeMessageSize(const wrkrInstanceData_t * const pWrkrData,const uchar * const message,uchar ** const tpls)678 computeMessageSize(const wrkrInstanceData_t *const pWrkrData,
679 const uchar *const message,
680 uchar **const tpls)
681 {
682 size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
683 if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
684 r += sizeof(META_STRT_CREATE)-1;
685 else
686 r += sizeof(META_STRT)-1;
687
688 uchar *searchIndex = NULL;
689 uchar *searchType;
690 uchar *parent = NULL;
691 uchar *bulkId = NULL;
692 uchar *pipelineName;
693
694 getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
695 r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType);
696
697 if(parent != NULL) {
698 r += sizeof(META_PARENT)-1 + ustrlen(parent);
699 }
700 if(bulkId != NULL) {
701 r += sizeof(META_ID)-1 + ustrlen(bulkId);
702 }
703 if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
704 r += sizeof(META_PIPELINE)-1 + ustrlen(pipelineName);
705 }
706
707 return r;
708 }
709
710
711 /* this method does not directly submit but builds a batch instead. It
712 * may submit, if we have dynamic index/type and the current type or
713 * index changes.
714 */
715 static rsRetVal
buildBatch(wrkrInstanceData_t * pWrkrData,uchar * message,uchar ** tpls)716 buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
717 {
718 int length = strlen((char *)message);
719 int r;
720 uchar *searchIndex = NULL;
721 uchar *searchType;
722 uchar *parent = NULL;
723 uchar *bulkId = NULL;
724 uchar *pipelineName;
725 DEFiRet;
726
727 getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
728 if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
729 r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
730 else
731 r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
732 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
733 ustrlen(searchIndex));
734 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
735 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
736 ustrlen(searchType));
737 if(parent != NULL) {
738 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
739 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
740 }
741 if(pipelineName != NULL && (!pWrkrData->pData->skipPipelineIfEmpty || pipelineName[0] != '\0')) {
742 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PIPELINE, sizeof(META_PIPELINE)-1);
743 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)pipelineName, ustrlen(pipelineName));
744 }
745 if(bulkId != NULL) {
746 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
747 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
748 }
749 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
750 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
751 if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
752 if(r != 0) {
753 LogError(0, RS_RET_ERR,
754 "omelasticsearch: growing batch failed with code %d", r);
755 ABORT_FINALIZE(RS_RET_ERR);
756 }
757 ++pWrkrData->batch.nmemb;
758 iRet = RS_RET_OK;
759
760 finalize_it:
761 RETiRet;
762 }
763
764 /*
765 * Dumps entire bulk request and response in error log
766 */
767 static rsRetVal
getDataErrorDefault(wrkrInstanceData_t * pWrkrData,fjson_object ** pReplyRoot,uchar * reqmsg,char ** rendered)768 getDataErrorDefault(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,char **rendered)
769 {
770 DEFiRet;
771 fjson_object *req=NULL;
772 fjson_object *errRoot=NULL;
773 fjson_object *replyRoot = *pReplyRoot;
774
775 if((req=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
776 fjson_object_object_add(req, "url", fjson_object_new_string((char*)pWrkrData->restURL));
777 fjson_object_object_add(req, "postdata", fjson_object_new_string((char*)reqmsg));
778
779 if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
780 fjson_object_object_add(errRoot, "request", req);
781 fjson_object_object_add(errRoot, "reply", replyRoot);
782 *rendered = strdup((char*)fjson_object_to_json_string(errRoot));
783
784 req=NULL;
785 fjson_object_put(errRoot);
786
787 *pReplyRoot = NULL; /* tell caller not to delete once again! */
788
789 finalize_it:
790 fjson_object_put(req);
791 RETiRet;
792 }
793
794 /*
795 * Sets bulkRequestNextSectionStart pointer to next sections start in the buffer pointed by bulkRequest.
796 * Sections are marked by { and }
797 */
798 static rsRetVal
getSection(const char * bulkRequest,const char ** bulkRequestNextSectionStart)799 getSection(const char* bulkRequest, const char **bulkRequestNextSectionStart )
800 {
801 DEFiRet;
802 char* idx =0;
803 if( (idx = strchr(bulkRequest,'\n')) != 0)/*intermediate section*/
804 {
805 *bulkRequestNextSectionStart = ++idx;
806 }
807 else
808 {
809 *bulkRequestNextSectionStart=0;
810 ABORT_FINALIZE(RS_RET_ERR);
811 }
812
813 finalize_it:
814 RETiRet;
815 }
816
817 /*
818 * Sets the new string in singleRequest for one request in bulkRequest
819 * and sets lastLocation pointer to the location till which bulkrequest has been parsed.
820 * (used as input to make function thread safe.)
821 */
822 static rsRetVal
getSingleRequest(const char * bulkRequest,char ** singleRequest,const char ** lastLocation)823 getSingleRequest(const char* bulkRequest, char** singleRequest, const char **lastLocation)
824 {
825 DEFiRet;
826 const char *req = bulkRequest;
827 const char *start = bulkRequest;
828 if (getSection(req,&req)!=RS_RET_OK)
829 ABORT_FINALIZE(RS_RET_ERR);
830
831 if (getSection(req,&req)!=RS_RET_OK)
832 ABORT_FINALIZE(RS_RET_ERR);
833
834 CHKmalloc(*singleRequest = (char*) calloc (req - start+ 1 + 1,1));
835 /* (req - start+ 1 == length of data + 1 for terminal char)*/
836 memcpy(*singleRequest,start,req - start);
837 *lastLocation=req;
838
839 finalize_it:
840 RETiRet;
841 }
842
843 /*
844 * check the status of response from ES
845 */
checkReplyStatus(fjson_object * ok)846 static int checkReplyStatus(fjson_object* ok) {
847 return (ok == NULL || !fjson_object_is_type(ok, fjson_type_int) || fjson_object_get_int(ok) < 0 ||
848 fjson_object_get_int(ok) > 299);
849 }
850
851 /*
852 * Context object for error file content creation or status check
853 * response_item - the full {"create":{"_index":"idxname",.....}}
854 * response_body - the inner hash of the response_item - {"_index":"idxname",...}
855 * status - the "status" field from the inner hash - "status":500
856 * should be able to use fjson_object_get_int(status) to get the http result code
857 */
858 typedef struct exeContext{
859 int statusCheckOnly;
860 fjson_object *errRoot;
861 rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
862 fjson_object *response_item, fjson_object *response_body, fjson_object *status);
863 es_write_ops_t writeOperation;
864 ratelimit_t *ratelimiter;
865 ruleset_t *retryRuleset;
866 struct json_tokener *jTokener;
867 } context;
868
869 /*
870 * get content to be written in error file using context passed
871 */
872 static rsRetVal
parseRequestAndResponseForContext(wrkrInstanceData_t * pWrkrData,fjson_object ** pReplyRoot,uchar * reqmsg,context * ctx)873 parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **pReplyRoot,uchar *reqmsg,context *ctx)
874 {
875 DEFiRet;
876 fjson_object *replyRoot = *pReplyRoot;
877 int i;
878 int numitems;
879 fjson_object *items=NULL, *jo_errors = NULL;
880 int errors = 0;
881
882 if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
883 errors = fjson_object_get_boolean(jo_errors);
884 if (!errors && pWrkrData->pData->retryFailures) {
885 return RS_RET_OK;
886 }
887 }
888
889 /*iterate over items*/
890 if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
891 LogError(0, RS_RET_DATAFAIL,
892 "omelasticsearch: error in elasticsearch reply: "
893 "bulkmode insert does not return array, reply is: %s",
894 pWrkrData->reply);
895 ABORT_FINALIZE(RS_RET_DATAFAIL);
896 }
897
898 numitems = fjson_object_array_length(items);
899
900 if (reqmsg) {
901 DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
902 } else {
903 DBGPRINTF("omelasticsearch: Empty request\n");
904 }
905 const char *lastReqRead= (char*)reqmsg;
906
907 DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
908 for(i = 0 ; i < numitems ; ++i) {
909
910 fjson_object *item=NULL;
911 fjson_object *result=NULL;
912 fjson_object *ok=NULL;
913 int itemStatus=0;
914 item = fjson_object_array_get_idx(items, i);
915 if(item == NULL) {
916 LogError(0, RS_RET_DATAFAIL,
917 "omelasticsearch: error in elasticsearch reply: "
918 "cannot obtain reply array item %d", i);
919 ABORT_FINALIZE(RS_RET_DATAFAIL);
920 }
921 fjson_object_object_get_ex(item, "create", &result);
922 if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) {
923 fjson_object_object_get_ex(item, "index", &result);
924 if(result == NULL || !fjson_object_is_type(result, fjson_type_object)) {
925 LogError(0, RS_RET_DATAFAIL,
926 "omelasticsearch: error in elasticsearch reply: "
927 "cannot obtain 'result' item for #%d", i);
928 ABORT_FINALIZE(RS_RET_DATAFAIL);
929 }
930 }
931
932 fjson_object_object_get_ex(result, "status", &ok);
933 itemStatus = checkReplyStatus(ok);
934
935 char *request =0;
936 char *response =0;
937 if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
938 if(itemStatus) {
939 DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, "
940 "status is %d\n", i, fjson_object_get_int(ok));
941 DBGPRINTF("omelasticsearch: status check found error.\n");
942 ABORT_FINALIZE(RS_RET_DATAFAIL);
943 }
944
945 } else {
946 if(getSingleRequest(lastReqRead,&request,&lastReqRead) != RS_RET_OK) {
947 DBGPRINTF("omelasticsearch: Couldn't get post request\n");
948 ABORT_FINALIZE(RS_RET_ERR);
949 }
950 response = (char*)fjson_object_to_json_string_ext(result, FJSON_TO_STRING_PLAIN);
951
952 if(response==NULL) {
953 free(request);/*as its has been assigned.*/
954 DBGPRINTF("omelasticsearch: Error getting fjson_object_to_string_ext. Cannot "
955 "continue\n");
956 ABORT_FINALIZE(RS_RET_ERR);
957 }
958
959 /*call the context*/
960 rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
961 response, item, result, ok);
962
963 /*free memory in any case*/
964 free(request);
965
966 if(ret != RS_RET_OK) {
967 DBGPRINTF("omelasticsearch: Error in preparing errorfileContent. Cannot continue\n");
968 ABORT_FINALIZE(RS_RET_ERR);
969 }
970 }
971 }
972
973 finalize_it:
974 RETiRet;
975 }
976
977 /*
978 * Dumps only failed requests of bulk insert
979 */
980 static rsRetVal
getDataErrorOnly(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)981 getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
982 fjson_object *response_item, fjson_object *response_body, fjson_object *status)
983 {
984 DEFiRet;
985 (void)response_item; /* unused */
986 (void)response_body; /* unused */
987 (void)status; /* unused */
988 if(itemStatus) {
989 fjson_object *onlyErrorResponses =NULL;
990 fjson_object *onlyErrorRequests=NULL;
991
992 if(!fjson_object_object_get_ex(ctx->errRoot, "reply", &onlyErrorResponses)) {
993 DBGPRINTF("omelasticsearch: Failed to get reply json array. Invalid context. Cannot "
994 "continue\n");
995 ABORT_FINALIZE(RS_RET_ERR);
996 }
997 fjson_object_array_add(onlyErrorResponses, fjson_object_new_string(response));
998
999 if(!fjson_object_object_get_ex(ctx->errRoot, "request", &onlyErrorRequests)) {
1000 DBGPRINTF("omelasticsearch: Failed to get request json array. Invalid context. Cannot "
1001 "continue\n");
1002 ABORT_FINALIZE(RS_RET_ERR);
1003 }
1004
1005 fjson_object_array_add(onlyErrorRequests, fjson_object_new_string(request));
1006
1007 }
1008
1009 finalize_it:
1010 RETiRet;
1011 }
1012
1013 /*
1014 * Dumps all requests of bulk insert interleaved with request and response
1015 */
1016
1017 static rsRetVal
getDataInterleaved(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1018 getDataInterleaved(context *ctx,
1019 int __attribute__((unused)) itemStatus,
1020 char *request,
1021 char *response,
1022 fjson_object *response_item,
1023 fjson_object *response_body,
1024 fjson_object *status
1025 )
1026 {
1027 DEFiRet;
1028 (void)response_item; /* unused */
1029 (void)response_body; /* unused */
1030 (void)status; /* unused */
1031 fjson_object *interleaved =NULL;
1032 if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved)) {
1033 DBGPRINTF("omelasticsearch: Failed to get response json array. Invalid context. Cannot continue\n");
1034 ABORT_FINALIZE(RS_RET_ERR);
1035 }
1036
1037 fjson_object *interleavedNode=NULL;
1038 /*create interleaved node that has req and response json data*/
1039 if((interleavedNode=fjson_object_new_object()) == NULL)
1040 {
1041 DBGPRINTF("omelasticsearch: Failed to create interleaved node. Cann't continue\n");
1042 ABORT_FINALIZE(RS_RET_ERR);
1043 }
1044 fjson_object_object_add(interleavedNode,"request", fjson_object_new_string(request));
1045 fjson_object_object_add(interleavedNode,"reply", fjson_object_new_string(response));
1046
1047 fjson_object_array_add(interleaved, interleavedNode);
1048
1049
1050
1051 finalize_it:
1052 RETiRet;
1053 }
1054
1055
1056 /*
1057 * Dumps only failed requests of bulk insert interleaved with request and response
1058 */
1059
1060 static rsRetVal
getDataErrorOnlyInterleaved(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1061 getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
1062 fjson_object *response_item, fjson_object *response_body, fjson_object *status)
1063 {
1064 DEFiRet;
1065 if (itemStatus) {
1066 if(getDataInterleaved(ctx, itemStatus,request,response,
1067 response_item, response_body, status)!= RS_RET_OK) {
1068 ABORT_FINALIZE(RS_RET_ERR);
1069 }
1070 }
1071
1072 finalize_it:
1073 RETiRet;
1074 }
1075
1076 /* input JSON looks like this:
1077 * {"someoperation":{"field1":"value1","field2":{.....}}}
1078 * output looks like this:
1079 * {"writeoperation":"someoperation","field1":"value1","field2":{.....}}
1080 */
1081 static rsRetVal
formatBulkReqOrResp(fjson_object * jo_input,fjson_object * jo_output)1082 formatBulkReqOrResp(fjson_object *jo_input, fjson_object *jo_output)
1083 {
1084 DEFiRet;
1085 fjson_object *jo = NULL;
1086 struct json_object_iterator it = json_object_iter_begin(jo_input);
1087 struct json_object_iterator itEnd = json_object_iter_end(jo_input);
1088
1089 /* set the writeoperation if not already set */
1090 if (!fjson_object_object_get_ex(jo_output, "writeoperation", NULL)) {
1091 const char *optype = NULL;
1092 if (!json_object_iter_equal(&it, &itEnd))
1093 optype = json_object_iter_peek_name(&it);
1094 if (optype) {
1095 jo = json_object_new_string(optype);
1096 } else {
1097 jo = json_object_new_string("unknown");
1098 }
1099 CHKmalloc(jo);
1100 json_object_object_add(jo_output, "writeoperation", jo);
1101 }
1102 if (!json_object_iter_equal(&it, &itEnd)) {
1103 /* now iterate the operation object */
1104 jo = json_object_iter_peek_value(&it);
1105 it = json_object_iter_begin(jo);
1106 itEnd = json_object_iter_end(jo);
1107 while (!json_object_iter_equal(&it, &itEnd)) {
1108 const char *name = json_object_iter_peek_name(&it);
1109 /* do not overwrite existing fields */
1110 if (!fjson_object_object_get_ex(jo_output, name, NULL)) {
1111 json_object_object_add(jo_output, name,
1112 json_object_get(json_object_iter_peek_value(&it)));
1113 }
1114 json_object_iter_next(&it);
1115 }
1116 }
1117 finalize_it:
1118 RETiRet;
1119 }
1120
1121 /* request string looks like this (other fields are "_parent" and "pipeline")
1122 * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
1123 * \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
1124 * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
1125 * store the metadata header fields in the metadata object
1126 * start = first \n + 1
1127 * end = last \n
1128 */
1129 static rsRetVal
createMsgFromRequest(const char * request,context * ctx,smsg_t ** msg,fjson_object * omes)1130 createMsgFromRequest(const char *request, context *ctx, smsg_t **msg, fjson_object *omes)
1131 {
1132 DEFiRet;
1133 fjson_object *jo_msg = NULL, *jo_metadata = NULL, *jo_request = NULL;
1134 const char *datastart, *dataend;
1135 size_t datalen;
1136 enum json_tokener_error json_error;
1137
1138 *msg = NULL;
1139 if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
1140 LogError(0, RS_RET_ERR,
1141 "omelasticsearch: malformed original request - "
1142 "could not find start of original data [%s]",
1143 request);
1144 ABORT_FINALIZE(RS_RET_ERR);
1145 }
1146 datalen = datastart - request;
1147 json_tokener_reset(ctx->jTokener);
1148 jo_metadata = json_tokener_parse_ex(ctx->jTokener, request, datalen);
1149 json_error = fjson_tokener_get_error(ctx->jTokener);
1150 if (!jo_metadata || (json_error != fjson_tokener_success)) {
1151 LogError(0, RS_RET_ERR,
1152 "omelasticsearch: parse error [%s] - could not convert original "
1153 "request metadata header JSON back into JSON object [%s]",
1154 fjson_tokener_error_desc(json_error), request);
1155 ABORT_FINALIZE(RS_RET_ERR);
1156 }
1157 CHKiRet(formatBulkReqOrResp(jo_metadata, omes));
1158
1159 datastart++; /* advance to '{' */
1160 if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
1161 LogError(0, RS_RET_ERR,
1162 "omelasticsearch: malformed original request - "
1163 "could not find end of original data [%s]",
1164 request);
1165 ABORT_FINALIZE(RS_RET_ERR);
1166 }
1167 datalen = dataend - datastart;
1168 json_tokener_reset(ctx->jTokener);
1169 jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
1170 json_error = fjson_tokener_get_error(ctx->jTokener);
1171 if (!jo_request || (json_error != fjson_tokener_success)) {
1172 LogError(0, RS_RET_ERR,
1173 "omelasticsearch: parse error [%s] - could not convert original "
1174 "request JSON back into JSON object [%s]",
1175 fjson_tokener_error_desc(json_error), request);
1176 ABORT_FINALIZE(RS_RET_ERR);
1177 }
1178
1179 CHKiRet(msgConstruct(msg));
1180 MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
1181 MsgSetInputName(*msg, pInputName);
1182 if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
1183 const char *rawmsg = json_object_get_string(jo_msg);
1184 const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
1185 MsgSetRawMsg(*msg, rawmsg, msgLen);
1186 } else {
1187 /* use entire data part of request as rawmsg */
1188 MsgSetRawMsg(*msg, datastart, datalen);
1189 }
1190 MsgSetMSGoffs(*msg, 0); /* we do not have a header... */
1191 MsgSetTAG(*msg, (const uchar *)"omes", 4);
1192 CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
1193
1194 finalize_it:
1195 if (jo_metadata)
1196 json_object_put(jo_metadata);
1197 RETiRet;
1198 }
1199
1200
1201 static rsRetVal
getDataRetryFailures(context * ctx,int itemStatus,char * request,char * response,fjson_object * response_item,fjson_object * response_body,fjson_object * status)1202 getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
1203 fjson_object *response_item, fjson_object *response_body, fjson_object *status)
1204 {
1205 DEFiRet;
1206 fjson_object *omes = NULL, *jo = NULL;
1207 int istatus = fjson_object_get_int(status);
1208 int iscreateop = 0;
1209 const char *optype = NULL;
1210 smsg_t *msg = NULL;
1211 int need_free_omes = 0;
1212
1213 (void)response;
1214 (void)itemStatus;
1215 (void)response_body;
1216 CHKmalloc(omes = json_object_new_object());
1217 need_free_omes = 1;
1218 /* this adds metadata header fields to omes */
1219 if (RS_RET_OK != (iRet = createMsgFromRequest(request, ctx, &msg, omes))) {
1220 if (iRet != RS_RET_OUT_OF_MEMORY) {
1221 STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1222 } else {
1223 ABORT_FINALIZE(iRet);
1224 }
1225 }
1226 CHKmalloc(msg);
1227 /* this adds response fields as local variables to omes */
1228 if (RS_RET_OK != (iRet = formatBulkReqOrResp(response_item, omes))) {
1229 if (iRet != RS_RET_OUT_OF_MEMORY) {
1230 STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1231 } else {
1232 ABORT_FINALIZE(iRet);
1233 }
1234 }
1235 if (fjson_object_object_get_ex(omes, "writeoperation", &jo)) {
1236 optype = json_object_get_string(jo);
1237 if (optype && !strcmp("create", optype))
1238 iscreateop = 1;
1239 if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
1240 iscreateop = 1;
1241 }
1242
1243 if (!optype) {
1244 STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1245 LogMsg(0, RS_RET_ERR, LOG_INFO,
1246 "omelasticsearch: no recognized operation type in response [%s]",
1247 response);
1248 } else if ((istatus == 200) || (istatus == 201)) {
1249 STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
1250 } else if ((istatus == 409) && iscreateop) {
1251 STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
1252 } else if ((istatus == 400) || (istatus < 200)) {
1253 STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
1254 } else {
1255 fjson_object *error = NULL, *errtype = NULL;
1256 if(fjson_object_object_get_ex(omes, "error", &error) &&
1257 fjson_object_object_get_ex(error, "type", &errtype)) {
1258 if (istatus == 429) {
1259 STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
1260 } else {
1261 STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
1262 }
1263 } else {
1264 STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
1265 LogMsg(0, RS_RET_ERR, LOG_INFO,
1266 "omelasticsearch: unexpected error response [%s]",
1267 response);
1268 }
1269 }
1270 need_free_omes = 0;
1271 CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
1272 MsgSetRuleset(msg, ctx->retryRuleset);
1273 CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
1274 finalize_it:
1275 if (need_free_omes)
1276 json_object_put(omes);
1277 RETiRet;
1278 }
1279
1280 /*
1281 * get erroronly context
1282 */
1283 static rsRetVal
initializeErrorOnlyConext(wrkrInstanceData_t * pWrkrData,context * ctx)1284 initializeErrorOnlyConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1285 DEFiRet;
1286 ctx->statusCheckOnly=0;
1287 fjson_object *errRoot=NULL;
1288 fjson_object *onlyErrorResponses =NULL;
1289 fjson_object *onlyErrorRequests=NULL;
1290 if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1291
1292 if((onlyErrorResponses=fjson_object_new_array()) == NULL) {
1293 fjson_object_put(errRoot);
1294 ABORT_FINALIZE(RS_RET_ERR);
1295 }
1296 if((onlyErrorRequests=fjson_object_new_array()) == NULL) {
1297 fjson_object_put(errRoot);
1298 fjson_object_put(onlyErrorResponses);
1299 ABORT_FINALIZE(RS_RET_ERR);
1300 }
1301
1302 fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1303 fjson_object_object_add(errRoot,"request",onlyErrorRequests);
1304 fjson_object_object_add(errRoot, "reply", onlyErrorResponses);
1305 ctx->errRoot = errRoot;
1306 ctx->prepareErrorFileContent= &getDataErrorOnly;
1307 finalize_it:
1308 RETiRet;
1309 }
1310
1311 /*
1312 * get interleaved context
1313 */
1314 static rsRetVal
initializeInterleavedConext(wrkrInstanceData_t * pWrkrData,context * ctx)1315 initializeInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1316 DEFiRet;
1317 ctx->statusCheckOnly=0;
1318 fjson_object *errRoot=NULL;
1319 fjson_object *interleaved =NULL;
1320 if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1321 if((interleaved=fjson_object_new_array()) == NULL) {
1322 fjson_object_put(errRoot);
1323 ABORT_FINALIZE(RS_RET_ERR);
1324 }
1325
1326
1327 fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1328 fjson_object_object_add(errRoot,"response",interleaved);
1329 ctx->errRoot = errRoot;
1330 ctx->prepareErrorFileContent= &getDataInterleaved;
1331 finalize_it:
1332 RETiRet;
1333 }
1334
1335 /*get interleaved context*/
1336 static rsRetVal
initializeErrorInterleavedConext(wrkrInstanceData_t * pWrkrData,context * ctx)1337 initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
1338 DEFiRet;
1339 ctx->statusCheckOnly=0;
1340 fjson_object *errRoot=NULL;
1341 fjson_object *interleaved =NULL;
1342 if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1343 if((interleaved=fjson_object_new_array()) == NULL) {
1344 fjson_object_put(errRoot);
1345 ABORT_FINALIZE(RS_RET_ERR);
1346 }
1347
1348
1349 fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1350 fjson_object_object_add(errRoot,"response",interleaved);
1351 ctx->errRoot = errRoot;
1352 ctx->prepareErrorFileContent= &getDataErrorOnlyInterleaved;
1353 finalize_it:
1354 RETiRet;
1355 }
1356
1357 /*get retry failures context*/
1358 static rsRetVal
initializeRetryFailuresContext(wrkrInstanceData_t * pWrkrData,context * ctx)1359 initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
1360 DEFiRet;
1361 ctx->statusCheckOnly=0;
1362 fjson_object *errRoot=NULL;
1363 if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
1364
1365
1366 fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
1367 ctx->errRoot = errRoot;
1368 ctx->prepareErrorFileContent= &getDataRetryFailures;
1369 CHKmalloc(ctx->jTokener = json_tokener_new());
1370 finalize_it:
1371 RETiRet;
1372 }
1373
1374
1375 /* write data error request/replies to separate error file
1376 * Note: we open the file but never close it before exit. If it
1377 * needs to be closed, HUP must be sent.
1378 */
ATTR_NONNULL()1379 static rsRetVal ATTR_NONNULL()
1380 writeDataError(wrkrInstanceData_t *const pWrkrData,
1381 instanceData *const pData, fjson_object **const pReplyRoot,
1382 uchar *const reqmsg)
1383 {
1384 char *rendered = NULL;
1385 size_t toWrite;
1386 ssize_t wrRet;
1387 sbool bMutLocked = 0;
1388 context ctx;
1389 ctx.errRoot=0;
1390 ctx.writeOperation = pWrkrData->pData->writeOperation;
1391 ctx.ratelimiter = pWrkrData->pData->ratelimiter;
1392 ctx.retryRuleset = pWrkrData->pData->retryRuleset;
1393 ctx.jTokener = NULL;
1394 DEFiRet;
1395
1396 if(pData->errorFile == NULL) {
1397 DBGPRINTF("omelasticsearch: no local error logger defined - "
1398 "ignoring ES error information\n");
1399 FINALIZE;
1400 }
1401
1402 pthread_mutex_lock(&pData->mutErrFile);
1403 bMutLocked = 1;
1404
1405 DBGPRINTF("omelasticsearch: error file mode: erroronly='%d' errorInterleaved='%d'\n",
1406 pData->errorOnly, pData->interleaved);
1407
1408 if(pData->interleaved ==0 && pData->errorOnly ==0)/*default write*/
1409 {
1410 if(getDataErrorDefault(pWrkrData,pReplyRoot, reqmsg, &rendered) != RS_RET_OK) {
1411 ABORT_FINALIZE(RS_RET_ERR);
1412 }
1413 } else {
1414 /*get correct context.*/
1415 if(pData->interleaved && pData->errorOnly)
1416 {
1417 if(initializeErrorInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) {
1418 DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
1419 ABORT_FINALIZE(RS_RET_ERR);
1420 }
1421
1422 } else if(pData->errorOnly) {
1423 if(initializeErrorOnlyConext(pWrkrData, &ctx) != RS_RET_OK) {
1424
1425 DBGPRINTF("omelasticsearch: error initializing error only context.\n");
1426 ABORT_FINALIZE(RS_RET_ERR);
1427 }
1428 } else if(pData->interleaved) {
1429 if(initializeInterleavedConext(pWrkrData, &ctx) != RS_RET_OK) {
1430 DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
1431 ABORT_FINALIZE(RS_RET_ERR);
1432 }
1433 } else if(pData->retryFailures) {
1434 if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
1435 DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
1436 ABORT_FINALIZE(RS_RET_ERR);
1437 }
1438 } else {
1439 DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
1440 ABORT_FINALIZE(RS_RET_ERR);
1441 }
1442
1443 /*execute context*/
1444 if(parseRequestAndResponseForContext(pWrkrData, pReplyRoot, reqmsg, &ctx)!= RS_RET_OK) {
1445 DBGPRINTF("omelasticsearch: error creating file content.\n");
1446 ABORT_FINALIZE(RS_RET_ERR);
1447 }
1448 CHKmalloc(rendered = strdup((char*)fjson_object_to_json_string(ctx.errRoot)));
1449 }
1450
1451
1452 if(pData->fdErrFile == -1) {
1453 pData->fdErrFile = open((char*)pData->errorFile,
1454 O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
1455 S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
1456 if(pData->fdErrFile == -1) {
1457 LogError(errno, RS_RET_ERR, "omelasticsearch: error opening error file %s",
1458 pData->errorFile);
1459 ABORT_FINALIZE(RS_RET_ERR);
1460 }
1461 }
1462
1463 /* we do not do real error-handling on the err file, as this finally complicates
1464 * things way to much.
1465 */
1466 DBGPRINTF("omelasticsearch: error record: '%s'\n", rendered);
1467 toWrite = strlen(rendered) + 1;
1468 /* Note: we overwrite the '\0' terminator with '\n' -- so we avoid
1469 * caling malloc() -- write() does NOT need '\0'!
1470 */
1471 rendered[toWrite-1] = '\n'; /* NO LONGER A STRING! */
1472 wrRet = write(pData->fdErrFile, rendered, toWrite);
1473 if(wrRet != (ssize_t) toWrite) {
1474 LogError(errno, RS_RET_IO_ERROR,
1475 "omelasticsearch: error writing error file %s, write returned %lld",
1476 pData->errorFile, (long long) wrRet);
1477 }
1478
1479 finalize_it:
1480 if(bMutLocked)
1481 pthread_mutex_unlock(&pData->mutErrFile);
1482 free(rendered);
1483 fjson_object_put(ctx.errRoot);
1484 if (ctx.jTokener)
1485 json_tokener_free(ctx.jTokener);
1486 RETiRet;
1487 }
1488
1489
1490 static rsRetVal
checkResultBulkmode(wrkrInstanceData_t * pWrkrData,fjson_object * root,uchar * reqmsg)1491 checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
1492 {
1493 DEFiRet;
1494 context ctx;
1495 ctx.errRoot = 0;
1496 ctx.writeOperation = pWrkrData->pData->writeOperation;
1497 ctx.ratelimiter = pWrkrData->pData->ratelimiter;
1498 ctx.retryRuleset = pWrkrData->pData->retryRuleset;
1499 ctx.statusCheckOnly=1;
1500 ctx.jTokener = NULL;
1501 if (pWrkrData->pData->retryFailures) {
1502 ctx.statusCheckOnly=0;
1503 CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
1504 }
1505 if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
1506 DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
1507 ABORT_FINALIZE(RS_RET_DATAFAIL);
1508 }
1509
1510 finalize_it:
1511 fjson_object_put(ctx.errRoot);
1512 if (ctx.jTokener)
1513 json_tokener_free(ctx.jTokener);
1514 RETiRet;
1515 }
1516
1517
1518 static rsRetVal
checkResult(wrkrInstanceData_t * pWrkrData,uchar * reqmsg)1519 checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
1520 {
1521 fjson_object *root;
1522 fjson_object *status;
1523 DEFiRet;
1524
1525 root = fjson_tokener_parse(pWrkrData->reply);
1526 if(root == NULL) {
1527 LogMsg(0, RS_RET_ERR, LOG_WARNING,
1528 "omelasticsearch: could not parse JSON result");
1529 ABORT_FINALIZE(RS_RET_ERR);
1530 }
1531
1532 if(pWrkrData->pData->bulkmode) {
1533 iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
1534 } else {
1535 if(fjson_object_object_get_ex(root, "status", &status)) {
1536 iRet = RS_RET_DATAFAIL;
1537 }
1538 }
1539
1540 /* Note: we ignore errors writing the error file, as we cannot handle
1541 * these in any case.
1542 */
1543 if(iRet == RS_RET_DATAFAIL) {
1544 STATSCOUNTER_INC(indexESFail, mutIndexESFail);
1545 writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg);
1546 iRet = RS_RET_OK; /* we have handled the problem! */
1547 }
1548
1549 finalize_it:
1550 if(root != NULL)
1551 fjson_object_put(root);
1552 if(iRet != RS_RET_OK) {
1553 STATSCOUNTER_INC(indexESFail, mutIndexESFail);
1554 }
1555 RETiRet;
1556 }
1557
ATTR_NONNULL()1558 static void ATTR_NONNULL()
1559 initializeBatch(wrkrInstanceData_t *pWrkrData)
1560 {
1561 es_emptyStr(pWrkrData->batch.data);
1562 pWrkrData->batch.nmemb = 0;
1563 }
1564
1565 static rsRetVal ATTR_NONNULL(1, 2)
curlPost(wrkrInstanceData_t * pWrkrData,uchar * message,int msglen,uchar ** tpls,const int nmsgs)1566 curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, const int nmsgs)
1567 {
1568 CURLcode code;
1569 CURL *const curl = pWrkrData->curlPostHandle;
1570 char errbuf[CURL_ERROR_SIZE] = "";
1571 DEFiRet;
1572
1573 PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1574
1575 if ((pWrkrData->pData->rebindInterval > -1) &&
1576 (pWrkrData->nOperations > pWrkrData->pData->rebindInterval)) {
1577 curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 1);
1578 pWrkrData->nOperations = 0;
1579 STATSCOUNTER_INC(rebinds, mutRebinds);
1580 } else {
1581 /* by default, reuse existing connections */
1582 curl_easy_setopt(curl, CURLOPT_FRESH_CONNECT, 0);
1583 }
1584 if ((pWrkrData->pData->rebindInterval > -1) &&
1585 (pWrkrData->nOperations == pWrkrData->pData->rebindInterval)) {
1586 curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1);
1587 } else {
1588 curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 0);
1589 }
1590
1591 if(pWrkrData->pData->numServers > 1) {
1592 /* needs to be called to support ES HA feature */
1593 CHKiRet(checkConn(pWrkrData));
1594 }
1595 pWrkrData->replyLen = 0;
1596 CHKiRet(setPostURL(pWrkrData, tpls));
1597
1598 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
1599 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
1600 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf);
1601 code = curl_easy_perform(curl);
1602 DBGPRINTF("curl returned %lld\n", (long long) code);
1603 if (code != CURLE_OK && code != CURLE_HTTP_RETURNED_ERROR) {
1604 STATSCOUNTER_INC(indexHTTPReqFail, mutIndexHTTPReqFail);
1605 indexHTTPFail += nmsgs;
1606 LogError(0, RS_RET_SUSPENDED,
1607 "omelasticsearch: we are suspending ourselfs due "
1608 "to server failure %lld: %s", (long long) code, errbuf);
1609 ABORT_FINALIZE(RS_RET_SUSPENDED);
1610 }
1611
1612 if (pWrkrData->pData->rebindInterval > -1)
1613 pWrkrData->nOperations++;
1614
1615 if(pWrkrData->reply == NULL) {
1616 DBGPRINTF("omelasticsearch: pWrkrData reply==NULL, replyLen = '%d'\n",
1617 pWrkrData->replyLen);
1618 } else {
1619 DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen);
1620 if(pWrkrData->replyLen > 0) {
1621 pWrkrData->reply[pWrkrData->replyLen] = '\0';
1622 /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
1623 }
1624 DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply);
1625 CHKiRet(checkResult(pWrkrData, message));
1626 }
1627
1628 finalize_it:
1629 incrementServerIndex(pWrkrData);
1630 RETiRet;
1631 }
1632
1633 static rsRetVal
submitBatch(wrkrInstanceData_t * pWrkrData)1634 submitBatch(wrkrInstanceData_t *pWrkrData)
1635 {
1636 char *cstr = NULL;
1637 DEFiRet;
1638
1639 cstr = es_str2cstr(pWrkrData->batch.data, NULL);
1640 dbgprintf("omelasticsearch: submitBatch, batch: '%s'\n", cstr);
1641
1642 CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb));
1643
1644 finalize_it:
1645 free(cstr);
1646 RETiRet;
1647 }
1648
1649 BEGINbeginTransaction
1650 CODESTARTbeginTransaction
1651 if(!pWrkrData->pData->bulkmode) {
1652 FINALIZE;
1653 }
1654
1655 initializeBatch(pWrkrData);
1656 finalize_it:
1657 ENDbeginTransaction
1658
1659 BEGINdoAction
1660 CODESTARTdoAction
1661 STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
1662
1663 if(pWrkrData->pData->bulkmode) {
1664 const size_t nBytes = computeMessageSize(pWrkrData, ppString[0], ppString);
1665
1666 /* If max bytes is set and this next message will put us over the limit,
1667 * submit the current buffer and reset */
1668 if(pWrkrData->pData->maxbytes > 0
1669 && es_strlen(pWrkrData->batch.data) + nBytes > pWrkrData->pData->maxbytes ) {
1670 dbgprintf("omelasticsearch: maxbytes limit reached, submitting partial "
1671 "batch of %d elements.\n", pWrkrData->batch.nmemb);
1672 CHKiRet(submitBatch(pWrkrData));
1673 initializeBatch(pWrkrData);
1674 }
1675 CHKiRet(buildBatch(pWrkrData, ppString[0], ppString));
1676
1677 /* If there is only one item in the batch, all previous items have been
1678 * submitted or this is the first item for this transaction. Return previous
1679 * committed so that all items leading up to the current (exclusive)
1680 * are not replayed should a failure occur anywhere else in the transaction. */
1681 iRet = pWrkrData->batch.nmemb == 1 ? RS_RET_PREVIOUS_COMMITTED : RS_RET_DEFER_COMMIT;
1682 } else {
1683 CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]),
1684 ppString, 1));
1685 }
1686 finalize_it:
1687 ENDdoAction
1688
1689
1690 BEGINendTransaction
1691 CODESTARTendTransaction
1692 /* End Transaction only if batch data is not empty */
1693 if (pWrkrData->batch.data != NULL && pWrkrData->batch.nmemb > 0) {
1694 CHKiRet(submitBatch(pWrkrData));
1695 } else {
1696 dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, "
1697 "nothing to send. \n");
1698 }
1699 finalize_it:
1700 ENDendTransaction
1701
1702 static rsRetVal
computeAuthHeader(char * uid,char * pwd,uchar ** authBuf)1703 computeAuthHeader(char* uid, char* pwd, uchar** authBuf) {
1704 int r;
1705 DEFiRet;
1706
1707 es_str_t* auth = es_newStr(1024);
1708 if (auth == NULL) {
1709 LogError(0, RS_RET_OUT_OF_MEMORY,
1710 "omelasticsearch: failed to allocate es_str auth for auth header construction");
1711 ABORT_FINALIZE(RS_RET_ERR);
1712 }
1713
1714 r = es_addBuf(&auth, uid, strlen(uid));
1715 if(r == 0) r = es_addChar(&auth, ':');
1716 if(r == 0 && pwd != NULL) r = es_addBuf(&auth, pwd, strlen(pwd));
1717 if(r == 0) *authBuf = (uchar*) es_str2cstr(auth, NULL);
1718
1719 if (r != 0 || *authBuf == NULL) {
1720 LogError(0, RS_RET_ERR, "omelasticsearch: failed to build auth header\n");
1721 ABORT_FINALIZE(RS_RET_ERR);
1722 }
1723
1724 finalize_it:
1725 if (auth != NULL)
1726 es_deleteStr(auth);
1727 RETiRet;
1728 }
1729
ATTR_NONNULL()1730 static void ATTR_NONNULL()
1731 curlSetupCommon(wrkrInstanceData_t *const pWrkrData, CURL *const handle)
1732 {
1733 PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1734 curl_easy_setopt(handle, CURLOPT_HTTPHEADER, pWrkrData->curlHeader);
1735 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, TRUE);
1736 curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
1737 curl_easy_setopt(handle, CURLOPT_WRITEDATA, pWrkrData);
1738 if(pWrkrData->pData->allowUnsignedCerts)
1739 curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, FALSE);
1740 if(pWrkrData->pData->skipVerifyHost)
1741 curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, FALSE);
1742 if(pWrkrData->pData->authBuf != NULL) {
1743 curl_easy_setopt(handle, CURLOPT_USERPWD, pWrkrData->pData->authBuf);
1744 curl_easy_setopt(handle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
1745 }
1746 if(pWrkrData->pData->caCertFile)
1747 curl_easy_setopt(handle, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
1748 if(pWrkrData->pData->myCertFile)
1749 curl_easy_setopt(handle, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
1750 if(pWrkrData->pData->myPrivKeyFile)
1751 curl_easy_setopt(handle, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
1752 /* uncomment for in-dept debuggung:
1753 curl_easy_setopt(handle, CURLOPT_VERBOSE, TRUE); */
1754 }
1755
ATTR_NONNULL()1756 static void ATTR_NONNULL()
1757 curlCheckConnSetup(wrkrInstanceData_t *const pWrkrData)
1758 {
1759 PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1760 curlSetupCommon(pWrkrData, pWrkrData->curlCheckConnHandle);
1761 curl_easy_setopt(pWrkrData->curlCheckConnHandle,
1762 CURLOPT_TIMEOUT_MS, pWrkrData->pData->healthCheckTimeout);
1763 }
1764
1765 static void ATTR_NONNULL(1)
curlPostSetup(wrkrInstanceData_t * const pWrkrData)1766 curlPostSetup(wrkrInstanceData_t *const pWrkrData)
1767 {
1768 PTR_ASSERT_SET_TYPE(pWrkrData, WRKR_DATA_TYPE_ES);
1769 curlSetupCommon(pWrkrData, pWrkrData->curlPostHandle);
1770 curl_easy_setopt(pWrkrData->curlPostHandle, CURLOPT_POST, 1);
1771 }
1772
1773 #define CONTENT_JSON "Content-Type: application/json; charset=utf-8"
1774
ATTR_NONNULL()1775 static rsRetVal ATTR_NONNULL()
1776 curlSetup(wrkrInstanceData_t *const pWrkrData)
1777 {
1778 DEFiRet;
1779 pWrkrData->curlHeader = curl_slist_append(NULL, CONTENT_JSON);
1780 CHKmalloc(pWrkrData->curlPostHandle = curl_easy_init());;
1781 curlPostSetup(pWrkrData);
1782
1783 CHKmalloc(pWrkrData->curlCheckConnHandle = curl_easy_init());
1784 curlCheckConnSetup(pWrkrData);
1785
1786 finalize_it:
1787 if(iRet != RS_RET_OK && pWrkrData->curlPostHandle != NULL) {
1788 curl_easy_cleanup(pWrkrData->curlPostHandle);
1789 pWrkrData->curlPostHandle = NULL;
1790 }
1791 RETiRet;
1792 }
1793
ATTR_NONNULL()1794 static void ATTR_NONNULL()
1795 setInstParamDefaults(instanceData *const pData)
1796 {
1797 pData->serverBaseUrls = NULL;
1798 pData->defaultPort = 9200;
1799 pData->healthCheckTimeout = 3500;
1800 pData->uid = NULL;
1801 pData->pwd = NULL;
1802 pData->authBuf = NULL;
1803 pData->searchIndex = NULL;
1804 pData->searchType = NULL;
1805 pData->pipelineName = NULL;
1806 pData->dynPipelineName = 0;
1807 pData->skipPipelineIfEmpty = 0;
1808 pData->parent = NULL;
1809 pData->timeout = NULL;
1810 pData->dynSrchIdx = 0;
1811 pData->dynSrchType = 0;
1812 pData->dynParent = 0;
1813 pData->useHttps = 0;
1814 pData->bulkmode = 0;
1815 pData->maxbytes = 104857600; //100 MB Is the default max message size that ships with ElasticSearch
1816 pData->allowUnsignedCerts = 0;
1817 pData->skipVerifyHost = 0;
1818 pData->tplName = NULL;
1819 pData->errorFile = NULL;
1820 pData->errorOnly=0;
1821 pData->interleaved=0;
1822 pData->dynBulkId= 0;
1823 pData->bulkId = NULL;
1824 pData->caCertFile = NULL;
1825 pData->myCertFile = NULL;
1826 pData->myPrivKeyFile = NULL;
1827 pData->writeOperation = ES_WRITE_INDEX;
1828 pData->retryFailures = 0;
1829 pData->ratelimitBurst = 20000;
1830 pData->ratelimitInterval = 600;
1831 pData->ratelimiter = NULL;
1832 pData->retryRulesetName = NULL;
1833 pData->retryRuleset = NULL;
1834 pData->rebindInterval = DEFAULT_REBIND_INTERVAL;
1835 }
1836
1837 BEGINnewActInst
1838 struct cnfparamvals *pvals;
1839 char* serverParam = NULL;
1840 struct cnfarray* servers = NULL;
1841 int i;
1842 int iNumTpls;
1843 FILE *fp;
1844 char errStr[1024];
1845 CODESTARTnewActInst
1846 if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
1847 ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
1848 }
1849
1850 CHKiRet(createInstance(&pData));
1851 setInstParamDefaults(pData);
1852
1853 for(i = 0 ; i < actpblk.nParams ; ++i) {
1854 if(!pvals[i].bUsed)
1855 continue;
1856 if(!strcmp(actpblk.descr[i].name, "server")) {
1857 servers = pvals[i].val.d.ar;
1858 } else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
1859 pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1860 } else if(!strcmp(actpblk.descr[i].name, "erroronly")) {
1861 pData->errorOnly = pvals[i].val.d.n;
1862 } else if(!strcmp(actpblk.descr[i].name, "interleaved")) {
1863 pData->interleaved = pvals[i].val.d.n;
1864 } else if(!strcmp(actpblk.descr[i].name, "serverport")) {
1865 pData->defaultPort = (int) pvals[i].val.d.n;
1866 } else if(!strcmp(actpblk.descr[i].name, "healthchecktimeout")) {
1867 pData->healthCheckTimeout = (long) pvals[i].val.d.n;
1868 } else if(!strcmp(actpblk.descr[i].name, "uid")) {
1869 pData->uid = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1870 } else if(!strcmp(actpblk.descr[i].name, "pwd")) {
1871 pData->pwd = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1872 } else if(!strcmp(actpblk.descr[i].name, "searchindex")) {
1873 pData->searchIndex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1874 } else if(!strcmp(actpblk.descr[i].name, "searchtype")) {
1875 pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1876 } else if(!strcmp(actpblk.descr[i].name, "pipelinename")) {
1877 pData->pipelineName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1878 } else if(!strcmp(actpblk.descr[i].name, "dynpipelinename")) {
1879 pData->dynPipelineName = pvals[i].val.d.n;
1880 } else if(!strcmp(actpblk.descr[i].name, "skippipelineifempty")) {
1881 pData->skipPipelineIfEmpty = pvals[i].val.d.n;
1882 } else if(!strcmp(actpblk.descr[i].name, "parent")) {
1883 pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1884 } else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
1885 pData->dynSrchIdx = pvals[i].val.d.n;
1886 } else if(!strcmp(actpblk.descr[i].name, "dynsearchtype")) {
1887 pData->dynSrchType = pvals[i].val.d.n;
1888 } else if(!strcmp(actpblk.descr[i].name, "dynparent")) {
1889 pData->dynParent = pvals[i].val.d.n;
1890 } else if(!strcmp(actpblk.descr[i].name, "bulkmode")) {
1891 pData->bulkmode = pvals[i].val.d.n;
1892 } else if(!strcmp(actpblk.descr[i].name, "maxbytes")) {
1893 pData->maxbytes = (size_t) pvals[i].val.d.n;
1894 } else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
1895 pData->allowUnsignedCerts = pvals[i].val.d.n;
1896 } else if(!strcmp(actpblk.descr[i].name, "skipverifyhost")) {
1897 pData->skipVerifyHost = pvals[i].val.d.n;
1898 } else if(!strcmp(actpblk.descr[i].name, "timeout")) {
1899 pData->timeout = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1900 } else if(!strcmp(actpblk.descr[i].name, "usehttps")) {
1901 pData->useHttps = pvals[i].val.d.n;
1902 } else if(!strcmp(actpblk.descr[i].name, "template")) {
1903 pData->tplName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1904 } else if(!strcmp(actpblk.descr[i].name, "dynbulkid")) {
1905 pData->dynBulkId = pvals[i].val.d.n;
1906 } else if(!strcmp(actpblk.descr[i].name, "bulkid")) {
1907 pData->bulkId = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1908 } else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
1909 pData->caCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1910 fp = fopen((const char*)pData->caCertFile, "r");
1911 if(fp == NULL) {
1912 rs_strerror_r(errno, errStr, sizeof(errStr));
1913 LogError(0, RS_RET_NO_FILE_ACCESS,
1914 "error: 'tls.cacert' file %s couldn't be accessed: %s\n",
1915 pData->caCertFile, errStr);
1916 } else {
1917 fclose(fp);
1918 }
1919 } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
1920 pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1921 fp = fopen((const char*)pData->myCertFile, "r");
1922 if(fp == NULL) {
1923 rs_strerror_r(errno, errStr, sizeof(errStr));
1924 LogError(0, RS_RET_NO_FILE_ACCESS,
1925 "error: 'tls.mycert' file %s couldn't be accessed: %s\n",
1926 pData->myCertFile, errStr);
1927 } else {
1928 fclose(fp);
1929 }
1930 } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
1931 pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1932 fp = fopen((const char*)pData->myPrivKeyFile, "r");
1933 if(fp == NULL) {
1934 rs_strerror_r(errno, errStr, sizeof(errStr));
1935 LogError(0, RS_RET_NO_FILE_ACCESS,
1936 "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
1937 pData->myPrivKeyFile, errStr);
1938 } else {
1939 fclose(fp);
1940 }
1941 } else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
1942 char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
1943 if (writeop && !strcmp(writeop, "create")) {
1944 pData->writeOperation = ES_WRITE_CREATE;
1945 } else if (writeop && !strcmp(writeop, "index")) {
1946 pData->writeOperation = ES_WRITE_INDEX;
1947 } else if (writeop) {
1948 LogError(0, RS_RET_CONFIG_ERROR,
1949 "omelasticsearch: invalid value '%s' for writeoperation: "
1950 "must be one of 'index' or 'create' - using default value 'index'", writeop);
1951 pData->writeOperation = ES_WRITE_INDEX;
1952 }
1953 free(writeop);
1954 } else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
1955 pData->retryFailures = pvals[i].val.d.n;
1956 } else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
1957 pData->ratelimitBurst = (unsigned int) pvals[i].val.d.n;
1958 } else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
1959 pData->ratelimitInterval = (unsigned int) pvals[i].val.d.n;
1960 } else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
1961 pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
1962 } else if(!strcmp(actpblk.descr[i].name, "rebindinterval")) {
1963 pData->rebindInterval = (int) pvals[i].val.d.n;
1964 } else {
1965 LogError(0, RS_RET_INTERNAL_ERROR, "omelasticsearch: program error, "
1966 "non-handled param '%s'", actpblk.descr[i].name);
1967 }
1968 }
1969
1970 if(pData->pwd != NULL && pData->uid == NULL) {
1971 LogError(0, RS_RET_UID_MISSING,
1972 "omelasticsearch: password is provided, but no uid "
1973 "- action definition invalid");
1974 ABORT_FINALIZE(RS_RET_UID_MISSING);
1975 }
1976 if(pData->dynSrchIdx && pData->searchIndex == NULL) {
1977 LogError(0, RS_RET_CONFIG_ERROR,
1978 "omelasticsearch: requested dynamic search index, but no "
1979 "name for index template given - action definition invalid");
1980 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1981 }
1982 if(pData->dynSrchType && pData->searchType == NULL) {
1983 LogError(0, RS_RET_CONFIG_ERROR,
1984 "omelasticsearch: requested dynamic search type, but no "
1985 "name for type template given - action definition invalid");
1986 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1987 }
1988 if(pData->dynParent && pData->parent == NULL) {
1989 LogError(0, RS_RET_CONFIG_ERROR,
1990 "omelasticsearch: requested dynamic parent, but no "
1991 "name for parent template given - action definition invalid");
1992 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1993 }
1994 if(pData->dynBulkId && pData->bulkId == NULL) {
1995 LogError(0, RS_RET_CONFIG_ERROR,
1996 "omelasticsearch: requested dynamic bulkid, but no "
1997 "name for bulkid template given - action definition invalid");
1998 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
1999 }
2000 if(pData->dynPipelineName && pData->pipelineName == NULL) {
2001 LogError(0, RS_RET_CONFIG_ERROR,
2002 "omelasticsearch: requested dynamic pipeline name, but no "
2003 "name for pipelineName template given - action definition invalid");
2004 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
2005 }
2006
2007 if (pData->uid != NULL)
2008 CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf));
2009
2010 iNumTpls = 1;
2011 if(pData->dynSrchIdx) ++iNumTpls;
2012 if(pData->dynSrchType) ++iNumTpls;
2013 if(pData->dynParent) ++iNumTpls;
2014 if(pData->dynBulkId) ++iNumTpls;
2015 if(pData->dynPipelineName) ++iNumTpls;
2016 DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
2017 CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
2018
2019 CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
2020 " StdJSONFmt" : (char*)pData->tplName),
2021 OMSR_NO_RQD_TPL_OPTS));
2022
2023
2024 /* we need to request additional templates. If we have a dynamic search index,
2025 * it will always be string 1. Type may be 1 or 2, depending on whether search
2026 * index is dynamic as well. Rule needs to be followed throughout the module.
2027 */
2028 iNumTpls = 1;
2029 if(pData->dynSrchIdx) {
2030 CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchIndex),
2031 OMSR_NO_RQD_TPL_OPTS));
2032 ++iNumTpls;
2033 }
2034 if(pData->dynSrchType) {
2035 CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchType),
2036 OMSR_NO_RQD_TPL_OPTS));
2037 ++iNumTpls;
2038 }
2039 if(pData->dynParent) {
2040 CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->parent),
2041 OMSR_NO_RQD_TPL_OPTS));
2042 ++iNumTpls;
2043 }
2044 if(pData->dynBulkId) {
2045 CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->bulkId),
2046 OMSR_NO_RQD_TPL_OPTS));
2047 ++iNumTpls;
2048 }
2049 if(pData->dynPipelineName) {
2050 CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->pipelineName),
2051 OMSR_NO_RQD_TPL_OPTS));
2052 ++iNumTpls;
2053 }
2054
2055
2056 if (servers != NULL) {
2057 pData->numServers = servers->nmemb;
2058 pData->serverBaseUrls = malloc(servers->nmemb * sizeof(uchar*));
2059 if (pData->serverBaseUrls == NULL) {
2060 LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2061 "for ElasticSearch server configuration.");
2062 ABORT_FINALIZE(RS_RET_ERR);
2063 }
2064
2065 for(i = 0 ; i < servers->nmemb ; ++i) {
2066 serverParam = es_str2cstr(servers->arr[i], NULL);
2067 if (serverParam == NULL) {
2068 LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2069 "for ElasticSearch server configuration.");
2070 ABORT_FINALIZE(RS_RET_ERR);
2071 }
2072 /* Remove a trailing slash if it exists */
2073 const size_t serverParamLastChar = strlen(serverParam)-1;
2074 if (serverParam[serverParamLastChar] == '/') {
2075 serverParam[serverParamLastChar] = '\0';
2076 }
2077 CHKiRet(computeBaseUrl(serverParam, pData->defaultPort, pData->useHttps,
2078 pData->serverBaseUrls + i));
2079 free(serverParam);
2080 serverParam = NULL;
2081 }
2082 } else {
2083 LogMsg(0, RS_RET_OK, LOG_WARNING,
2084 "omelasticsearch: No servers specified, using localhost");
2085 pData->numServers = 1;
2086 pData->serverBaseUrls = malloc(sizeof(uchar*));
2087 if (pData->serverBaseUrls == NULL) {
2088 LogError(0, RS_RET_ERR, "omelasticsearch: unable to allocate buffer "
2089 "for ElasticSearch server configuration.");
2090 ABORT_FINALIZE(RS_RET_ERR);
2091 }
2092 CHKiRet(computeBaseUrl("localhost", pData->defaultPort, pData->useHttps, pData->serverBaseUrls));
2093 }
2094
2095 if(pData->searchIndex == NULL)
2096 pData->searchIndex = (uchar*) strdup("system");
2097 if(pData->searchType == NULL)
2098 pData->searchType = (uchar*) strdup("events");
2099
2100 if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
2101 LogError(0, RS_RET_CONFIG_ERROR,
2102 "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
2103 ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
2104 }
2105
2106 if (pData->retryFailures) {
2107 CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
2108 ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
2109 ratelimitSetNoTimeCache(pData->ratelimiter);
2110 }
2111
2112 /* node created, let's add to list of instance configs for the module */
2113 if(loadModConf->tail == NULL) {
2114 loadModConf->tail = loadModConf->root = pData;
2115 } else {
2116 loadModConf->tail->next = pData;
2117 loadModConf->tail = pData;
2118 }
2119
2120 CODE_STD_FINALIZERnewActInst
2121 cnfparamvalsDestruct(pvals, &actpblk);
2122 if (serverParam)
2123 free(serverParam);
2124 ENDnewActInst
2125
2126
2127 BEGINbeginCnfLoad
2128 CODESTARTbeginCnfLoad
2129 loadModConf = pModConf;
2130 pModConf->pConf = pConf;
2131 pModConf->root = pModConf->tail = NULL;
2132 ENDbeginCnfLoad
2133
2134
2135 BEGINendCnfLoad
2136 CODESTARTendCnfLoad
2137 loadModConf = NULL; /* done loading */
2138 ENDendCnfLoad
2139
2140
2141 BEGINcheckCnf
2142 instanceConf_t *inst;
2143 CODESTARTcheckCnf
2144 for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
2145 ruleset_t *pRuleset;
2146 rsRetVal localRet;
2147
2148 if (inst->retryRulesetName) {
2149 localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
2150 if(localRet == RS_RET_NOT_FOUND) {
2151 LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
2152 "no retry ruleset will be used", inst->retryRulesetName);
2153 } else {
2154 inst->retryRuleset = pRuleset;
2155 }
2156 }
2157 }
2158 ENDcheckCnf
2159
2160
2161 BEGINactivateCnf
2162 CODESTARTactivateCnf
2163 ENDactivateCnf
2164
2165
2166 BEGINfreeCnf
2167 CODESTARTfreeCnf
2168 ENDfreeCnf
2169
2170
2171 BEGINdoHUP
2172 CODESTARTdoHUP
2173 pthread_mutex_lock(&pData->mutErrFile);
2174 if(pData->fdErrFile != -1) {
2175 close(pData->fdErrFile);
2176 pData->fdErrFile = -1;
2177 }
2178 pthread_mutex_unlock(&pData->mutErrFile);
2179 ENDdoHUP
2180
2181
2182 BEGINmodExit
2183 CODESTARTmodExit
2184 if(pInputName != NULL)
2185 prop.Destruct(&pInputName);
2186 curl_global_cleanup();
2187 statsobj.Destruct(&indexStats);
2188 objRelease(statsobj, CORE_COMPONENT);
2189 objRelease(prop, CORE_COMPONENT);
2190 objRelease(ruleset, CORE_COMPONENT);
2191 ENDmodExit
2192
2193 NO_LEGACY_CONF_parseSelectorAct
2194
2195 BEGINqueryEtryPt
2196 CODESTARTqueryEtryPt
2197 CODEqueryEtryPt_STD_OMOD_QUERIES
2198 CODEqueryEtryPt_STD_OMOD8_QUERIES
2199 CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
2200 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
2201 CODEqueryEtryPt_doHUP
2202 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
2203 CODEqueryEtryPt_STD_CONF2_QUERIES
2204 ENDqueryEtryPt
2205
2206
2207 BEGINmodInit()
2208 CODESTARTmodInit
2209 *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
2210 CODEmodInit_QueryRegCFSLineHdlr
2211 CHKiRet(objUse(statsobj, CORE_COMPONENT));
2212 CHKiRet(objUse(prop, CORE_COMPONENT));
2213 CHKiRet(objUse(ruleset, CORE_COMPONENT));
2214
2215 if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
2216 LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
2217 ABORT_FINALIZE(RS_RET_OBJ_CREATION_FAILED);
2218 }
2219
2220 /* support statistics gathering */
2221 CHKiRet(statsobj.Construct(&indexStats));
2222 CHKiRet(statsobj.SetName(indexStats, (uchar *)"omelasticsearch"));
2223 CHKiRet(statsobj.SetOrigin(indexStats, (uchar *)"omelasticsearch"));
2224 STATSCOUNTER_INIT(indexSubmit, mutIndexSubmit);
2225 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"submitted",
2226 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSubmit));
2227 STATSCOUNTER_INIT(indexHTTPFail, mutIndexHTTPFail);
2228 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.http",
2229 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPFail));
2230 STATSCOUNTER_INIT(indexHTTPReqFail, mutIndexHTTPReqFail);
2231 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.httprequests",
2232 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexHTTPReqFail));
2233 STATSCOUNTER_INIT(checkConnFail, mutCheckConnFail);
2234 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.checkConn",
2235 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &checkConnFail));
2236 STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
2237 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
2238 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
2239 STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
2240 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
2241 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
2242 STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
2243 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
2244 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
2245 STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
2246 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
2247 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
2248 STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
2249 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
2250 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
2251 STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
2252 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
2253 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
2254 STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
2255 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
2256 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
2257 STATSCOUNTER_INIT(rebinds, mutRebinds);
2258 CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"rebinds",
2259 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &rebinds));
2260 CHKiRet(statsobj.ConstructFinalize(indexStats));
2261 CHKiRet(prop.Construct(&pInputName));
2262 CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
2263 CHKiRet(prop.ConstructFinalize(pInputName));
2264 ENDmodInit
2265
2266 /* vi:set ai:
2267 */
2268