1 /* mmgrok.c
2 * Grok the message is parsed into a structured json data inside JSON.
3 */
4 #include "config.h"
5 #include "rsyslog.h"
6 #include <stdio.h>
7 #include <stdarg.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <assert.h>
11 #include <signal.h>
12 #include <errno.h>
13 #include <unistd.h>
14 #include <ctype.h>
15 #include <json.h>
16 #include <grok.h>
17 #include <glib.h>
18 #include "conf.h"
19 #include "syslogd-types.h"
20 #include "template.h"
21 #include "module-template.h"
22 #include "errmsg.h"
23 #include "cfsysline.h"
24 #include "dirty.h"
25
26 MODULE_TYPE_OUTPUT
27 MODULE_TYPE_NOKEEP
28 MODULE_CNFNAME("mmgrok");
29
30 static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
31
32 DEF_OMOD_STATIC_DATA
33
34 typedef struct result_s{
35 char *key;
36 int key_len;
37 const char *value;
38 int value_len;
39 char *type;
40 }result_t;
41
42 /* config variables */
43 typedef struct _instanceData
44 {
45 char *pszPatternDir;
46 char *pszMatch;
47 char *pszSource;
48 char *pszTarget;/* as a json root for store parse json data */
49 smsg_t *pmsg; /* store origin messages*/
50 }instanceData;
51
52 typedef struct wrkrInstanceData{
53 instanceData *pData;
54 }wrkrInstanceData_t;
55
56 struct modConfData_s{
57 rsconf_t *pConf;/* our overall config object */
58 };
59
60 static modConfData_t *loadModConf = NULL;
61 static modConfData_t *runModConf = NULL;
62
63 /* action (instance) paramters */
64 static struct cnfparamdescr actpdescr[]={
65 {"patterndir",eCmdHdlrString,0},
66 {"match",eCmdHdlrString,0},
67 {"source",eCmdHdlrString,0},
68 {"target",eCmdHdlrString,0},
69 };
70
71 static struct cnfparamblk actpblk =
72 {
73 CNFPARAMBLK_VERSION,
74 sizeof(actpdescr)/sizeof(struct cnfparamdescr),
75 actpdescr
76 };
77
78 BEGINbeginCnfLoad
79 CODESTARTbeginCnfLoad
80 loadModConf = pModConf;
81 pModConf->pConf = pConf;
82 ENDbeginCnfLoad
83
84 BEGINendCnfLoad
85 CODESTARTendCnfLoad
86 ENDendCnfLoad
87
88 BEGINcheckCnf
89 CODESTARTcheckCnf
90 ENDcheckCnf
91
92 BEGINactivateCnf
93 CODESTARTactivateCnf
94 runModConf = pModConf;
95 ENDactivateCnf
96
97 BEGINfreeCnf
98 CODESTARTfreeCnf
99 ENDfreeCnf
100
101 BEGINcreateInstance
102 CODESTARTcreateInstance
103 ENDcreateInstance
104
105 BEGINcreateWrkrInstance
106 CODESTARTcreateWrkrInstance
107 ENDcreateWrkrInstance
108
109 BEGINisCompatibleWithFeature
110 CODESTARTisCompatibleWithFeature
111 ENDisCompatibleWithFeature
112
113 BEGINfreeInstance
114 CODESTARTfreeInstance
115 free(pData->pszPatternDir);
116 free(pData->pszMatch);
117 free(pData->pszSource);
118 free(pData->pszTarget);
119 ENDfreeInstance
120
121 BEGINfreeWrkrInstance
122 CODESTARTfreeWrkrInstance
123 ENDfreeWrkrInstance
124
125
setInstParamDefaults(instanceData * pData)126 static inline void setInstParamDefaults(instanceData *pData)
127 {
128 pData->pszPatternDir= NULL;
129 pData->pszMatch = NULL;
130 pData->pszSource = NULL;
131 pData->pszTarget = NULL;
132 pData->pmsg = NULL;
133 }
134
135
136 BEGINnewActInst
137 struct cnfparamvals *pvals;
138 int i;
139 CODESTARTnewActInst
140 DBGPRINTF("newActInst (mmgrok)\n");
141 if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
142 ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
143 }
144
145 CODE_STD_STRING_REQUESTnewActInst(1)
146 CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
147 CHKiRet(createInstance(&pData));
148 setInstParamDefaults(pData);
149
150 for(i = 0 ; i < actpblk.nParams ; ++i) {
151 if(!pvals[i].bUsed)
152 continue;
153 if(!strcmp(actpblk.descr[i].name, "patterndir")) {
154 pData->pszPatternDir= es_str2cstr(pvals[i].val.d.estr, NULL);
155 continue;
156 }
157 else if(!strcmp(actpblk.descr[i].name, "match")) {
158 pData->pszMatch = es_str2cstr(pvals[i].val.d.estr, NULL);
159 continue;
160 }
161 else if(!strcmp(actpblk.descr[i].name, "source")) {
162 pData->pszSource= es_str2cstr(pvals[i].val.d.estr, NULL);
163 continue;
164 }
165 else if(!strcmp(actpblk.descr[i].name,"target"))
166 {
167 pData->pszTarget=es_str2cstr(pvals[i].val.d.estr,NULL);
168 continue;
169 }
170 else{
171 DBGPRINTF("mmgrok: program error, non-handled "
172 "param '%s'\n", actpblk.descr[i].name);
173 }
174 }
175 if(pData->pszTarget == NULL) {
176 CHKmalloc(pData->pszTarget = strdup("!"));
177 }
178 CODE_STD_FINALIZERnewActInst
179 cnfparamvalsDestruct(pvals, &actpblk);
180 ENDnewActInst
181
182 BEGINdbgPrintInstInfo
183 CODESTARTdbgPrintInstInfo
184 DBGPRINTF("mmgrok\n");
185 ENDdbgPrintInstInfo
186
187 BEGINtryResume
188 CODESTARTtryResume
189 ENDtryResume
190
CreateGrok(void)191 static inline grok_t *CreateGrok(void)
192 {
193 grok_t *grok = grok_new();
194 if(grok == NULL){
195 DBGPRINTF("mmgrok: create a grok faile!");
196 exit(1);
197 }
198 grok_init(grok);
199 return grok;
200 }
201
202 /* the parseing is complate message into json */
203 static rsRetVal
smsg_to_json(GList * list,instanceData * pData)204 smsg_to_json(GList *list,instanceData *pData)
205 {
206 GList *it= list;
207
208 struct json_object *json;
209 struct json_object *jval;
210
211 DEFiRet;
212
213 json = json_object_new_object();
214 if(json == NULL)
215 {
216 ABORT_FINALIZE(RS_RET_ERR);
217 }
218 for(;it;it= it->next)
219 {
220 int key_len = ((result_t *)it->data)->key_len;
221 char *key = (char *)malloc(key_len+1);
222 snprintf(key,key_len+1,"%.*s",key_len,((result_t *)it->data)->key);
223 int value_len = ((result_t *)it->data)->value_len;
224 char *value = (char *)malloc(value_len+1);
225 snprintf(value,value_len+1,"%.*s",value_len,((result_t*)it->data)->value);
226 jval = json_object_new_string(value);
227 json_object_object_add(json,key,jval);
228 free(key);
229 free(value);
230 }
231 msgAddJSON(pData->pmsg,(uchar*)pData->pszTarget,json,0,0);
232 finalize_it:
233 RETiRet;
234 }
235
236 /* store parse result ,use list in glib*/
237 static rsRetVal
parse_result_store(const grok_match_t gm,instanceData * pData)238 parse_result_store(const grok_match_t gm,instanceData *pData)
239 {
240 GList *re_list = NULL;
241 char *pname;
242 const char *pdata;
243 int pname_len,pdata_len;
244
245 char *key;
246 char *type;
247 DEFiRet;
248
249 grok_match_walk_init(&gm); //grok API
250
251 while(grok_match_walk_next(&gm,&pname,&pname_len,&pdata,&pdata_len) == 0) {
252 /* parse key and value type from patterns */
253 key = strchr(pname,':');
254
255 if(key!=NULL) {
256 int key_len;
257 result_t *result = g_new0(result_t,1);
258 key_len = pname_len - ((key+1) - pname);
259 key = key+1;
260 pname_len = key_len;
261 type = strchr(key,':');
262 int type_len;
263 if(type != NULL) {
264 key_len = (type - key);
265 type = type+1;
266 type_len = pname_len - key_len -1;
267 type[type_len] = '\0';
268 } else {
269 type = (char*)"null";
270 }
271 /* store parse result into list */
272 result->key = key;
273 result->key_len = key_len;
274 result->value = pdata;
275 result->value_len = pdata_len;
276 result->type = type;
277 /* the value of merger the same key*/
278 re_list = g_list_append(re_list,result);
279 }
280 }
281 smsg_to_json(re_list,pData);
282 g_list_free(re_list);
283 grok_match_walk_end(&gm);
284 RETiRet;
285 }
286
287 /* motify message for per line */
288 static rsRetVal
MotifyLine(char * line,grok_t * grok,instanceData * pData)289 MotifyLine(char *line,grok_t *grok,instanceData *pData)
290 {
291 grok_match_t gm;
292 DEFiRet;
293 grok_patterns_import_from_file(grok,pData->pszPatternDir);
294 int compile = grok_compile(grok,pData->pszMatch);
295 if(compile!=GROK_OK)
296 {
297 DBGPRINTF("mmgrok: grok_compile faile!exit code: %d\n",compile);
298 ABORT_FINALIZE(RS_RET_ERR);
299 }
300 int exe = grok_exec(grok,line,&gm);
301 if(exe!=GROK_OK)
302 {
303 DBGPRINTF("mmgrok: grok_exec faile!exit code: %d\n",exe);
304 ABORT_FINALIZE(RS_RET_ERR);
305 }
306 parse_result_store(gm,pData);
307 finalize_it:
308 RETiRet;
309 }
310
311 /* motify rsyslog messages */
312 static rsRetVal
MotifyMessage(instanceData * pData)313 MotifyMessage(instanceData *pData)
314 {
315 char *saveptr = NULL;
316 DEFiRet;
317 grok_t *grok = CreateGrok();
318 char *msg = strdup(pData->pszSource);
319 char *line = NULL;
320 line = strtok_r(msg, "\n", &saveptr);
321 while(line!=NULL) {
322 MotifyLine(line,grok,pData);
323 line = strtok_r(NULL, "\n", &saveptr);
324 }
325 free(msg);msg=NULL;
326 RETiRet;
327 }
328
329
330 BEGINdoAction_NoStrings
331 smsg_t **ppMsg = (smsg_t **) pMsgData;
332 smsg_t *pMsg = ppMsg[0];
333 uchar *buf;
334 instanceData *pData;
335
336 CODESTARTdoAction
337 pData = pWrkrData->pData;
338 buf = getMSG(pMsg);
339 pData->pmsg = pMsg;
340 while(*buf && isspace(*buf)) {
341 ++buf;
342 }
343
344 if(*buf == '\0' ) {
345 DBGPRINTF("mmgrok: not msg for mmgrok!");
346 ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
347 }
348 pData->pszSource = (char *)buf;
349 CHKiRet(MotifyMessage(pData));
350
351 finalize_it:
352 ENDdoAction
353
354 BEGINparseSelectorAct
355 CODESTARTparseSelectorAct
356 CODE_STD_STRING_REQUESTparseSelectorAct(1)
357 if(strncmp((char*) p, ":mmgrok:", sizeof(":mmgrok:") - 1)) {
358 ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
359 }
360
361 p += sizeof(":mmgrok:") - 1; /* eat indicator sequence (-1 because of '\0'!) */
362 CHKiRet(createInstance(&pData));
363
364 if(*(p-1) == ';')
365 --p;
366 CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat"));
367 CODE_STD_FINALIZERparseSelectorAct
368 ENDparseSelectorAct
369
370 BEGINmodExit
371 CODESTARTmodExit
372 ENDmodExit
373
374
375 BEGINqueryEtryPt
376 CODESTARTqueryEtryPt
377 CODEqueryEtryPt_STD_OMOD_QUERIES
378 CODEqueryEtryPt_STD_OMOD8_QUERIES
379 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
380 CODEqueryEtryPt_STD_CONF2_QUERIES
381 ENDqueryEtryPt
382
resetConfigVariables(uchar * pp,void * pVal)383 static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
384 {
385 DEFiRet;
386 RETiRet;
387 }
388
389 BEGINmodInit()
390 rsRetVal localRet;
391 rsRetVal (*pomsrGetSupportedTplOpts)(unsigned long *pOpts);
392 unsigned long opts;
393 int bMsgPassingSupported;
394 CODESTARTmodInit
395 *ipIFVersProvided = CURR_MOD_IF_VERSION;
396 CODEmodInit_QueryRegCFSLineHdlr
397 DBGPRINTF("mmgrok: module compiled with rsyslog version %s.\n", VERSION);
398 bMsgPassingSupported = 0;
399 localRet = pHostQueryEtryPt((uchar*)"OMSRgetSupportedTplOpts",
400 &pomsrGetSupportedTplOpts);
401 if(localRet == RS_RET_OK) {
402 CHKiRet((*pomsrGetSupportedTplOpts)(&opts));
403 if(opts & OMSR_TPL_AS_MSG)
404 bMsgPassingSupported = 1;
405 } else if(localRet != RS_RET_ENTRY_POINT_NOT_FOUND) {
406 ABORT_FINALIZE(localRet); /* Something else went wrong, not acceptable */
407 }
408
409 if(!bMsgPassingSupported) {
410 DBGPRINTF("mmgrok: msg-passing is not supported by rsyslog core, "
411 "can not continue.\n");
412 ABORT_FINALIZE(RS_RET_NO_MSG_PASSING);
413 }
414
415
416 CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
417 resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
418 ENDmodInit
419