1 /* mmsequence.c
2 * Generate a number based on some sequence.
3 *
4 * Copyright 2013 pavel@levshin.spb.ru.
5 *
6 * Based on: mmcount.c
7 * Copyright 2013 Red Hat Inc.
8 *
9 * This file is part of rsyslog.
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 * -or-
17 * see COPYING.ASL20 in the source distribution
18 *
19 * Unless required by applicable law or agreed to in writing, software
20 * distributed under the License is distributed on an "AS IS" BASIS,
21 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 * See the License for the specific language governing permissions and
23 * limitations under the License.
24 */
25 #include "config.h"
26 #include "rsyslog.h"
27 #include <stdio.h>
28 #include <stdarg.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <assert.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include <unistd.h>
35 #include <stdint.h>
36 #include <time.h>
37 #include <limits.h>
38 #include <json.h>
39 #include <pthread.h>
40 #include "conf.h"
41 #include "syslogd-types.h"
42 #include "srUtils.h"
43 #include "template.h"
44 #include "module-template.h"
45 #include "errmsg.h"
46 #include "hashtable.h"
47
48 #define JSON_VAR_NAME "$!mmsequence"
49
50 enum mmSequenceModes {
51 mmSequenceRandom,
52 mmSequencePerInstance,
53 mmSequencePerKey
54 };
55
56 MODULE_TYPE_OUTPUT
57 MODULE_TYPE_NOKEEP
58 MODULE_CNFNAME("mmsequence")
59
60
61 DEF_OMOD_STATIC_DATA
62
63 /* config variables */
64
65 typedef struct _instanceData {
66 enum mmSequenceModes mode;
67 int valueFrom;
68 int valueTo;
69 int step;
70 unsigned int seed;
71 int value;
72 char *pszKey;
73 char *pszVar;
74 } instanceData;
75
76 typedef struct wrkrInstanceData {
77 instanceData *pData;
78 } wrkrInstanceData_t;
79
80 struct modConfData_s {
81 rsconf_t *pConf; /* our overall config object */
82 };
83 static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
84 static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
85
86
87 /* tables for interfacing with the v6 config system */
88 /* action (instance) parameters */
89 static struct cnfparamdescr actpdescr[] = {
90 { "mode", eCmdHdlrGetWord, 0 },
91 { "from", eCmdHdlrNonNegInt, 0 },
92 { "to", eCmdHdlrPositiveInt, 0 },
93 { "step", eCmdHdlrNonNegInt, 0 },
94 { "key", eCmdHdlrGetWord, 0 },
95 { "var", eCmdHdlrGetWord, 0 },
96 };
97 static struct cnfparamblk actpblk =
98 { CNFPARAMBLK_VERSION,
99 sizeof(actpdescr)/sizeof(struct cnfparamdescr),
100 actpdescr
101 };
102
103 /* table for key-counter pairs */
104 static struct hashtable *ght;
105 static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER;
106
107 static pthread_mutex_t inst_mutex = PTHREAD_MUTEX_INITIALIZER;
108
109 BEGINbeginCnfLoad
110 CODESTARTbeginCnfLoad
111 loadModConf = pModConf;
112 pModConf->pConf = pConf;
113 ENDbeginCnfLoad
114
115 BEGINendCnfLoad
116 CODESTARTendCnfLoad
117 ENDendCnfLoad
118
119 BEGINcheckCnf
120 CODESTARTcheckCnf
121 ENDcheckCnf
122
123 BEGINactivateCnf
124 CODESTARTactivateCnf
125 runModConf = pModConf;
126 ENDactivateCnf
127
128 BEGINfreeCnf
129 CODESTARTfreeCnf
130 ENDfreeCnf
131
132
133 BEGINcreateInstance
134 CODESTARTcreateInstance
135 ENDcreateInstance
136
137 BEGINcreateWrkrInstance
138 CODESTARTcreateWrkrInstance
139 ENDcreateWrkrInstance
140
141
142 BEGINisCompatibleWithFeature
143 CODESTARTisCompatibleWithFeature
144 ENDisCompatibleWithFeature
145
146
147 BEGINfreeInstance
148 CODESTARTfreeInstance
149 ENDfreeInstance
150
151 BEGINfreeWrkrInstance
152 CODESTARTfreeWrkrInstance
153 ENDfreeWrkrInstance
154
155
156 static inline void
setInstParamDefaults(instanceData * pData)157 setInstParamDefaults(instanceData *pData)
158 {
159 pData->mode = mmSequencePerInstance;
160 pData->valueFrom = 0;
161 pData->valueTo = INT_MAX;
162 pData->step = 1;
163 pData->pszKey = (char*)"";
164 pData->pszVar = (char*)JSON_VAR_NAME;
165 }
166
167 BEGINnewActInst
168 struct cnfparamvals *pvals;
169 int i;
170 char *cstr;
171 CODESTARTnewActInst
172 DBGPRINTF("newActInst (mmsequence)\n");
173 if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
174 ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
175 }
176
177 CODE_STD_STRING_REQUESTnewActInst(1)
178 CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
179 CHKiRet(createInstance(&pData));
180 setInstParamDefaults(pData);
181
182 for(i = 0 ; i < actpblk.nParams ; ++i) {
183 if(!pvals[i].bUsed)
184 continue;
185 if(!strcmp(actpblk.descr[i].name, "mode")) {
186 if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"random",
187 sizeof("random")-1)) {
188 pData->mode = mmSequenceRandom;
189 } else if (!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"instance",
190 sizeof("instance")-1)) {
191 pData->mode = mmSequencePerInstance;
192 } else if (!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"key",
193 sizeof("key")-1)) {
194 pData->mode = mmSequencePerKey;
195 } else {
196 cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
197 LogError(0, RS_RET_INVLD_MODE,
198 "mmsequence: invalid mode '%s' - ignored",
199 cstr);
200 free(cstr);
201 }
202 continue;
203 }
204 if(!strcmp(actpblk.descr[i].name, "from")) {
205 pData->valueFrom = pvals[i].val.d.n;
206 continue;
207 }
208 if(!strcmp(actpblk.descr[i].name, "to")) {
209 pData->valueTo = pvals[i].val.d.n;
210 continue;
211 }
212 if(!strcmp(actpblk.descr[i].name, "step")) {
213 pData->step = pvals[i].val.d.n;
214 continue;
215 }
216 if(!strcmp(actpblk.descr[i].name, "key")) {
217 pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
218 continue;
219 }
220 if(!strcmp(actpblk.descr[i].name, "var")) {
221 cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
222 if (strlen(cstr) < 3) {
223 LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
224 "mmsequence: valid variable name should be at least "
225 "3 symbols long, got %s", cstr);
226 free(cstr);
227 } else if (cstr[0] != '$') {
228 LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
229 "mmsequence: valid variable name should start with $,"
230 "got %s", cstr);
231 free(cstr);
232 } else {
233 pData->pszVar = cstr;
234 }
235 continue;
236 }
237 dbgprintf("mmsequence: program error, non-handled "
238 "param '%s'\n", actpblk.descr[i].name);
239 }
240 switch(pData->mode) {
241 case mmSequenceRandom:
242 pData->seed = (unsigned int)(intptr_t)pData ^ (unsigned int)time(NULL);
243 break;
244 case mmSequencePerInstance:
245 pData->value = pData->valueTo;
246 break;
247 case mmSequencePerKey:
248 if (pthread_mutex_lock(&ght_mutex)) {
249 DBGPRINTF("mmsequence: mutex lock has failed!\n");
250 ABORT_FINALIZE(RS_RET_ERR);
251 }
252 if (ght == NULL) {
253 if(NULL == (ght = create_hashtable(100, hash_from_string, key_equals_string, NULL))) {
254 pthread_mutex_unlock(&ght_mutex);
255 DBGPRINTF("mmsequence: error creating hash table!\n");
256 ABORT_FINALIZE(RS_RET_ERR);
257 }
258 }
259 pthread_mutex_unlock(&ght_mutex);
260 break;
261 default:
262 LogError(0, RS_RET_INVLD_MODE,
263 "mmsequence: this mode is not currently implemented");
264 }
265
266 CODE_STD_FINALIZERnewActInst
267 cnfparamvalsDestruct(pvals, &actpblk);
268 ENDnewActInst
269
270
271 BEGINdbgPrintInstInfo
272 CODESTARTdbgPrintInstInfo
273 ENDdbgPrintInstInfo
274
275
276 BEGINtryResume
277 CODESTARTtryResume
278 ENDtryResume
279
280 static int *
getCounter(struct hashtable * ht,char * str,int initial)281 getCounter(struct hashtable *ht, char *str, int initial) {
282 int *pCounter;
283 char *pStr;
284
285 pCounter = hashtable_search(ht, str);
286 if(pCounter) {
287 return pCounter;
288 }
289
290 /* counter is not found for the str, so add new entry and
291 return the counter */
292 if(NULL == (pStr = strdup(str))) {
293 DBGPRINTF("mmsequence: memory allocation for key failed\n");
294 return NULL;
295 }
296
297 if(NULL == (pCounter = (int*)malloc(sizeof(*pCounter)))) {
298 DBGPRINTF("mmsequence: memory allocation for value failed\n");
299 free(pStr);
300 return NULL;
301 }
302 *pCounter = initial;
303
304 if(!hashtable_insert(ht, pStr, pCounter)) {
305 DBGPRINTF("mmsequence: inserting element into hashtable failed\n");
306 free(pStr);
307 free(pCounter);
308 return NULL;
309 }
310 return pCounter;
311 }
312
313
314 BEGINdoAction_NoStrings
315 smsg_t **ppMsg = (smsg_t **) pMsgData;
316 smsg_t *pMsg = ppMsg[0];
317 struct json_object *json;
318 int val = 0;
319 int *pCounter;
320 instanceData *pData;
321 CODESTARTdoAction
322 pData = pWrkrData->pData;
323
324 switch(pData->mode) {
325 case mmSequenceRandom:
326 val = pData->valueFrom + (rand_r(&pData->seed) %
327 (pData->valueTo - pData->valueFrom));
328 break;
329 case mmSequencePerInstance:
330 if (!pthread_mutex_lock(&inst_mutex)) {
331 if (pData->value >= pData->valueTo - pData->step) {
332 pData->value = pData->valueFrom;
333 } else {
334 pData->value += pData->step;
335 }
336 val = pData->value;
337 pthread_mutex_unlock(&inst_mutex);
338 } else {
339 LogError(0, RS_RET_ERR,
340 "mmsequence: mutex lock has failed!");
341 }
342 break;
343 case mmSequencePerKey:
344 if (!pthread_mutex_lock(&ght_mutex)) {
345 pCounter = getCounter(ght, pData->pszKey, pData->valueTo);
346 if(pCounter) {
347 if (*pCounter >= pData->valueTo - pData->step
348 || *pCounter < pData->valueFrom ) {
349 *pCounter = pData->valueFrom;
350 } else {
351 *pCounter += pData->step;
352 }
353 val = *pCounter;
354 } else {
355 LogError(0, RS_RET_NOT_FOUND,
356 "mmsequence: unable to fetch the counter from hash");
357 }
358 pthread_mutex_unlock(&ght_mutex);
359 } else {
360 LogError(0, RS_RET_ERR,
361 "mmsequence: mutex lock has failed!");
362 }
363
364 break;
365 default:
366 LogError(0, RS_RET_NOT_IMPLEMENTED,
367 "mmsequence: this mode is not currently implemented");
368 }
369
370 /* finalize_it: */
371 json = json_object_new_int(val);
372 if (json == NULL) {
373 LogError(0, RS_RET_OBJ_CREATION_FAILED,
374 "mmsequence: unable to create JSON");
375 } else if (RS_RET_OK != msgAddJSON(pMsg, (uchar *)pData->pszVar + 1, json, 0, 0)) {
376 LogError(0, RS_RET_OBJ_CREATION_FAILED,
377 "mmsequence: unable to pass out the value");
378 json_object_put(json);
379 }
380 ENDdoAction
381
382
383 NO_LEGACY_CONF_parseSelectorAct
384
385
386 BEGINmodExit
387 CODESTARTmodExit
388 ENDmodExit
389
390
391 BEGINqueryEtryPt
392 CODESTARTqueryEtryPt
393 CODEqueryEtryPt_STD_OMOD_QUERIES
394 CODEqueryEtryPt_STD_OMOD8_QUERIES
395 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
396 CODEqueryEtryPt_STD_CONF2_QUERIES
397 ENDqueryEtryPt
398
399
400
401 BEGINmodInit()
402 CODESTARTmodInit
403 *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
404 CODEmodInit_QueryRegCFSLineHdlr
405 DBGPRINTF("mmsequence: module compiled with rsyslog version %s.\n", VERSION);
406 ENDmodInit
407