1 /* mmgrok.c
2  * Grok the message is parsed into a structured json data inside JSON.
3  */
4 #include "config.h"
5 #include "rsyslog.h"
6 #include <stdio.h>
7 #include <stdarg.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <assert.h>
11 #include <signal.h>
12 #include <errno.h>
13 #include <unistd.h>
14 #include <ctype.h>
15 #include <json.h>
16 #include <grok.h>
17 #include <glib.h>
18 #include "conf.h"
19 #include "syslogd-types.h"
20 #include "template.h"
21 #include "module-template.h"
22 #include "errmsg.h"
23 #include "cfsysline.h"
24 #include "dirty.h"
25 
26 MODULE_TYPE_OUTPUT
27 MODULE_TYPE_NOKEEP
28 MODULE_CNFNAME("mmgrok");
29 
30 static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);
31 
32 DEF_OMOD_STATIC_DATA
33 
34 typedef struct result_s{
35 	char *key;
36 	int key_len;
37 	const char *value;
38 	int value_len;
39 	char *type;
40 }result_t;
41 
42 /* config variables */
43 typedef struct _instanceData
44 {
45 	char *pszPatternDir;
46 	char *pszMatch;
47 	char *pszSource;
48 	char *pszTarget;/* as a json root for store parse json data */
49 	smsg_t *pmsg;    /* store  origin messages*/
50 }instanceData;
51 
52 typedef struct wrkrInstanceData{
53 	instanceData *pData;
54 }wrkrInstanceData_t;
55 
56 struct modConfData_s{
57 	rsconf_t   *pConf;/* our overall config object */
58 };
59 
60 static  modConfData_t   *loadModConf = NULL;
61 static  modConfData_t   *runModConf   = NULL;
62 
63 /* action (instance) paramters */
64 static struct cnfparamdescr actpdescr[]={
65 	{"patterndir",eCmdHdlrString,0},
66 	{"match",eCmdHdlrString,0},
67 	{"source",eCmdHdlrString,0},
68 	{"target",eCmdHdlrString,0},
69 };
70 
71 static struct cnfparamblk actpblk =
72 {
73 	CNFPARAMBLK_VERSION,
74 	sizeof(actpdescr)/sizeof(struct cnfparamdescr),
75 	actpdescr
76 };
77 
78 BEGINbeginCnfLoad
79 CODESTARTbeginCnfLoad
80 	loadModConf = pModConf;
81 	pModConf->pConf = pConf;
82 ENDbeginCnfLoad
83 
84 BEGINendCnfLoad
85 CODESTARTendCnfLoad
86 ENDendCnfLoad
87 
88 BEGINcheckCnf
89 CODESTARTcheckCnf
90 ENDcheckCnf
91 
92 BEGINactivateCnf
93 CODESTARTactivateCnf
94 	runModConf = pModConf;
95 ENDactivateCnf
96 
97 BEGINfreeCnf
98 CODESTARTfreeCnf
99 ENDfreeCnf
100 
101 BEGINcreateInstance
102 CODESTARTcreateInstance
103 ENDcreateInstance
104 
105 BEGINcreateWrkrInstance
106 CODESTARTcreateWrkrInstance
107 ENDcreateWrkrInstance
108 
109 BEGINisCompatibleWithFeature
110 CODESTARTisCompatibleWithFeature
111 ENDisCompatibleWithFeature
112 
113 BEGINfreeInstance
114 CODESTARTfreeInstance
115 	 free(pData->pszPatternDir);
116 	 free(pData->pszMatch);
117 	 free(pData->pszSource);
118 	 free(pData->pszTarget);
119 ENDfreeInstance
120 
121 BEGINfreeWrkrInstance
122 CODESTARTfreeWrkrInstance
123 ENDfreeWrkrInstance
124 
125 
setInstParamDefaults(instanceData * pData)126 static inline void setInstParamDefaults(instanceData *pData)
127 {
128 	pData->pszPatternDir= NULL;
129 	pData->pszMatch = NULL;
130 	pData->pszSource = NULL;
131 	pData->pszTarget = NULL;
132 	pData->pmsg = NULL;
133 }
134 
135 
136 BEGINnewActInst
137 	struct cnfparamvals *pvals;
138 	int i;
139 CODESTARTnewActInst
140 	DBGPRINTF("newActInst (mmgrok)\n");
141 	if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
142 		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
143 	}
144 
145 	CODE_STD_STRING_REQUESTnewActInst(1)
146 	CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
147 	CHKiRet(createInstance(&pData));
148 	setInstParamDefaults(pData);
149 
150 	for(i = 0 ; i < actpblk.nParams ; ++i) {
151 		if(!pvals[i].bUsed)
152 			continue;
153 		if(!strcmp(actpblk.descr[i].name, "patterndir")) {
154 			pData->pszPatternDir= es_str2cstr(pvals[i].val.d.estr, NULL);
155 			continue;
156 		}
157 		else if(!strcmp(actpblk.descr[i].name, "match")) {
158 			pData->pszMatch = es_str2cstr(pvals[i].val.d.estr, NULL);
159 			continue;
160 		}
161 		else if(!strcmp(actpblk.descr[i].name, "source")) {
162 			pData->pszSource= es_str2cstr(pvals[i].val.d.estr, NULL);
163 			continue;
164 		}
165 		else  if(!strcmp(actpblk.descr[i].name,"target"))
166 	            {
167 	                    pData->pszTarget=es_str2cstr(pvals[i].val.d.estr,NULL);
168 	                    continue;
169 	            }
170 		else{
171 	         	DBGPRINTF("mmgrok: program error, non-handled "
172 	               	"param '%s'\n", actpblk.descr[i].name);
173 		}
174 	}
175 	if(pData->pszTarget == NULL) {
176 		CHKmalloc(pData->pszTarget = strdup("!"));
177 	}
178 CODE_STD_FINALIZERnewActInst
179 	cnfparamvalsDestruct(pvals, &actpblk);
180 ENDnewActInst
181 
182 BEGINdbgPrintInstInfo
183 CODESTARTdbgPrintInstInfo
184 	DBGPRINTF("mmgrok\n");
185 ENDdbgPrintInstInfo
186 
187 BEGINtryResume
188 CODESTARTtryResume
189 ENDtryResume
190 
CreateGrok(void)191 static inline grok_t *CreateGrok(void)
192 {
193 	grok_t  *grok = grok_new();
194 	if(grok == NULL){
195 	    DBGPRINTF("mmgrok: create a grok faile!");
196 	    exit(1);
197 	}
198 	grok_init(grok);
199 	return grok;
200 }
201 
202 /* the parseing is complate message into json */
203 static rsRetVal
smsg_to_json(GList * list,instanceData * pData)204 smsg_to_json(GList *list,instanceData *pData)
205 {
206 	GList *it= list;
207 
208 	struct json_object *json;
209 	struct json_object *jval;
210 
211 	DEFiRet;
212 
213 	json = json_object_new_object();
214 	if(json == NULL)
215 	{
216 	        ABORT_FINALIZE(RS_RET_ERR);
217 	}
218 	for(;it;it= it->next)
219 	{
220 	int key_len = ((result_t *)it->data)->key_len;
221 	    char *key = (char *)malloc(key_len+1);
222 	    snprintf(key,key_len+1,"%.*s",key_len,((result_t *)it->data)->key);
223 	int value_len = ((result_t *)it->data)->value_len;
224 	char *value = (char *)malloc(value_len+1);
225 	    snprintf(value,value_len+1,"%.*s",value_len,((result_t*)it->data)->value);
226 	jval = json_object_new_string(value);
227 	json_object_object_add(json,key,jval);
228 	free(key);
229 	free(value);
230 	}
231 	msgAddJSON(pData->pmsg,(uchar*)pData->pszTarget,json,0,0);
232 finalize_it:
233 	RETiRet;
234 }
235 
236 /* store parse result ,use list in glib*/
237 static rsRetVal
parse_result_store(const grok_match_t gm,instanceData * pData)238 parse_result_store(const grok_match_t gm,instanceData *pData)
239 {
240 	    GList *re_list = NULL;
241 	    char  *pname;
242 	    const char  *pdata;
243 	    int    pname_len,pdata_len;
244 
245 	    char *key;
246 	    char *type;
247 	    DEFiRet;
248 
249 	    grok_match_walk_init(&gm); //grok API
250 
251 	    while(grok_match_walk_next(&gm,&pname,&pname_len,&pdata,&pdata_len) == 0) {
252 	        /* parse key and value type from patterns */
253 	        key = strchr(pname,':');
254 
255 	        if(key!=NULL) {
256 	            int key_len;
257 	            result_t *result = g_new0(result_t,1);
258 	            key_len = pname_len - ((key+1) - pname);
259 	            key = key+1;
260 	            pname_len = key_len;
261 	            type = strchr(key,':');
262 	            int type_len;
263 	            if(type != NULL) {
264 	                key_len = (type - key);
265 	                type = type+1;
266 	                type_len = pname_len - key_len -1;
267 			type[type_len] = '\0';
268 	            } else {
269 		    	type = (char*)"null";
270 		    }
271 	            /* store parse result into list */
272 	            result->key = key;
273 	            result->key_len = key_len;
274 	            result->value = pdata;
275 	            result->value_len = pdata_len;
276 	            result->type = type;
277 	            /* the value of merger the same key*/
278 	            re_list = g_list_append(re_list,result);
279 	        }
280 	    }
281 	    smsg_to_json(re_list,pData);
282 	    g_list_free(re_list);
283 	    grok_match_walk_end(&gm);
284 	RETiRet;
285 }
286 
287 /* motify message for per line */
288 static rsRetVal
MotifyLine(char * line,grok_t * grok,instanceData * pData)289 MotifyLine(char *line,grok_t *grok,instanceData *pData)
290 {
291 	grok_match_t  gm;
292 	DEFiRet;
293 	grok_patterns_import_from_file(grok,pData->pszPatternDir);
294 	int compile = grok_compile(grok,pData->pszMatch);
295 	if(compile!=GROK_OK)
296 	{
297 	    DBGPRINTF("mmgrok: grok_compile faile!exit code: %d\n",compile);
298 	ABORT_FINALIZE(RS_RET_ERR);
299 	}
300 	int exe = grok_exec(grok,line,&gm);
301 	if(exe!=GROK_OK)
302 	{
303 	    DBGPRINTF("mmgrok: grok_exec faile!exit code: %d\n",exe);
304 	ABORT_FINALIZE(RS_RET_ERR);
305 	}
306 	parse_result_store(gm,pData);
307 finalize_it:
308 	RETiRet;
309 }
310 
311 /* motify rsyslog messages */
312 static rsRetVal
MotifyMessage(instanceData * pData)313 MotifyMessage(instanceData *pData)
314 {
315 	char *saveptr = NULL;
316 	DEFiRet;
317 	grok_t  *grok = CreateGrok();
318 	char     *msg = strdup(pData->pszSource);
319 	char     *line = NULL;
320 	line = strtok_r(msg, "\n", &saveptr);
321 	while(line!=NULL) {
322 		MotifyLine(line,grok,pData);
323 		line = strtok_r(NULL, "\n", &saveptr);
324 	}
325 	free(msg);msg=NULL;
326 	RETiRet;
327 }
328 
329 
330 BEGINdoAction_NoStrings
331 	smsg_t **ppMsg = (smsg_t **) pMsgData;
332 	smsg_t *pMsg = ppMsg[0];
333 	uchar *buf;
334 	    instanceData *pData;
335 
336 CODESTARTdoAction
337 	    pData = pWrkrData->pData;
338 	buf = getMSG(pMsg);
339 	    pData->pmsg = pMsg;
340 	while(*buf && isspace(*buf)) {
341 		++buf;
342 	}
343 
344 	if(*buf == '\0' ) {
345 		DBGPRINTF("mmgrok:  not msg for mmgrok!");
346 		ABORT_FINALIZE(RS_RET_NO_CEE_MSG);
347 	}
348 	pData->pszSource = (char *)buf;
349 	CHKiRet(MotifyMessage(pData));
350 
351 finalize_it:
352 ENDdoAction
353 
354 BEGINparseSelectorAct
355 CODESTARTparseSelectorAct
356 CODE_STD_STRING_REQUESTparseSelectorAct(1)
357 	if(strncmp((char*) p, ":mmgrok:", sizeof(":mmgrok:") - 1)) {
358 		ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
359 	}
360 
361 	p += sizeof(":mmgrok:") - 1; /* eat indicator sequence  (-1 because of '\0'!) */
362 	CHKiRet(createInstance(&pData));
363 
364 	if(*(p-1) == ';')
365 		--p;
366 	CHKiRet(cflineParseTemplateName(&p, *ppOMSR, 0, OMSR_TPL_AS_MSG, (uchar*) "RSYSLOG_FileFormat"));
367 CODE_STD_FINALIZERparseSelectorAct
368 ENDparseSelectorAct
369 
370 BEGINmodExit
371 CODESTARTmodExit
372 ENDmodExit
373 
374 
375 BEGINqueryEtryPt
376 CODESTARTqueryEtryPt
377 CODEqueryEtryPt_STD_OMOD_QUERIES
378 CODEqueryEtryPt_STD_OMOD8_QUERIES
379 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
380 CODEqueryEtryPt_STD_CONF2_QUERIES
381 ENDqueryEtryPt
382 
resetConfigVariables(uchar * pp,void * pVal)383 static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
384 {
385 	DEFiRet;
386 	RETiRet;
387 }
388 
389 BEGINmodInit()
390 	rsRetVal localRet;
391 	rsRetVal (*pomsrGetSupportedTplOpts)(unsigned long *pOpts);
392 	unsigned long opts;
393 	int bMsgPassingSupported;
394 CODESTARTmodInit
395 	*ipIFVersProvided = CURR_MOD_IF_VERSION;
396 CODEmodInit_QueryRegCFSLineHdlr
397 	DBGPRINTF("mmgrok: module compiled with rsyslog version %s.\n", VERSION);
398 	bMsgPassingSupported = 0;
399 	localRet = pHostQueryEtryPt((uchar*)"OMSRgetSupportedTplOpts",
400 			&pomsrGetSupportedTplOpts);
401 	if(localRet == RS_RET_OK) {
402 		CHKiRet((*pomsrGetSupportedTplOpts)(&opts));
403 		if(opts & OMSR_TPL_AS_MSG)
404 			bMsgPassingSupported = 1;
405 	} else if(localRet != RS_RET_ENTRY_POINT_NOT_FOUND) {
406 		ABORT_FINALIZE(localRet); /* Something else went wrong, not acceptable */
407 	}
408 
409 	if(!bMsgPassingSupported) {
410 		DBGPRINTF("mmgrok: msg-passing is not supported by rsyslog core, "
411 			  "can not continue.\n");
412 		ABORT_FINALIZE(RS_RET_NO_MSG_PASSING);
413 	}
414 
415 
416 	CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
417 				    resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
418 ENDmodInit
419