1 /* ratelimit.c
2  * support for rate-limiting sources, including "last message
3  * repeated n times" processing.
4  *
5  * Copyright 2012-2020 Rainer Gerhards and Adiscon GmbH.
6  *
7  * This file is part of the rsyslog runtime library.
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *       http://www.apache.org/licenses/LICENSE-2.0
14  *       -or-
15  *       see COPYING.ASL20 in the source distribution
16  *
17  * Unless required by applicable law or agreed to in writing, software
18  * distributed under the License is distributed on an "AS IS" BASIS,
19  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  * See the License for the specific language governing permissions and
21  * limitations under the License.
22  */
23 #include "config.h"
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27 
28 #include "rsyslog.h"
29 #include "errmsg.h"
30 #include "ratelimit.h"
31 #include "datetime.h"
32 #include "parser.h"
33 #include "unicode-helper.h"
34 #include "msg.h"
35 #include "rsconf.h"
36 #include "dirty.h"
37 
38 /* definitions for objects we access */
39 DEFobjStaticHelpers
DEFobjCurrIf(glbl)40 DEFobjCurrIf(glbl)
41 DEFobjCurrIf(datetime)
42 DEFobjCurrIf(parser)
43 
44 /* static data */
45 
46 /* generate a "repeated n times" message */
47 static smsg_t *
48 ratelimitGenRepMsg(ratelimit_t *ratelimit)
49 {
50 	smsg_t *repMsg;
51 	size_t lenRepMsg;
52 	uchar szRepMsg[1024];
53 
54 	if(ratelimit->nsupp == 1) { /* we simply use the original message! */
55 		repMsg = MsgAddRef(ratelimit->pMsg);
56 	} else {/* we need to duplicate, original message may still be in use in other
57 		 * parts of the system!  */
58 		if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
59 			DBGPRINTF("Message duplication failed, dropping repeat message.\n");
60 			goto done;
61 		}
62 		lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
63 					" message repeated %d times: [%.800s]",
64 					ratelimit->nsupp, getMSG(ratelimit->pMsg));
65 		MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
66 	}
67 
68 done:	return repMsg;
69 }
70 
71 static rsRetVal
doLastMessageRepeatedNTimes(ratelimit_t * ratelimit,smsg_t * pMsg,smsg_t ** ppRepMsg)72 doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
73 {
74 	int bNeedUnlockMutex = 0;
75 	DEFiRet;
76 
77 	if(ratelimit->bThreadSafe) {
78 		pthread_mutex_lock(&ratelimit->mut);
79 		bNeedUnlockMutex = 1;
80 	}
81 
82 	if( ratelimit->pMsg != NULL &&
83 	    getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
84 	    !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
85 	    !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
86 	    !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
87 	    !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
88 		ratelimit->nsupp++;
89 		DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
90 		/* use current message, so we have the new timestamp
91 		 * (means we need to discard previous one) */
92 		msgDestruct(&ratelimit->pMsg);
93 		ratelimit->pMsg = pMsg;
94 		ABORT_FINALIZE(RS_RET_DISCARDMSG);
95 	} else {/* new message, do "repeat processing" & save it */
96 		if(ratelimit->pMsg != NULL) {
97 			if(ratelimit->nsupp > 0) {
98 				*ppRepMsg = ratelimitGenRepMsg(ratelimit);
99 				ratelimit->nsupp = 0;
100 			}
101 			msgDestruct(&ratelimit->pMsg);
102 		}
103 		ratelimit->pMsg = MsgAddRef(pMsg);
104 	}
105 
106 finalize_it:
107 	if(bNeedUnlockMutex)
108 		pthread_mutex_unlock(&ratelimit->mut);
109 	RETiRet;
110 }
111 
112 
113 /* helper: tell how many messages we lost due to linux-like ratelimiting */
114 static void
tellLostCnt(ratelimit_t * ratelimit)115 tellLostCnt(ratelimit_t *ratelimit)
116 {
117 	uchar msgbuf[1024];
118 	if(ratelimit->missed) {
119 		snprintf((char*)msgbuf, sizeof(msgbuf),
120 			 "%s: %u messages lost due to rate-limiting (%u allowed within %u seconds)",
121 			 ratelimit->name, ratelimit->missed, ratelimit->burst, ratelimit->interval);
122 		ratelimit->missed = 0;
123 		logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
124 	}
125 }
126 
127 /* Linux-like ratelimiting, modelled after the linux kernel
128  * returns 1 if message is within rate limit and shall be
129  * processed, 0 otherwise.
130  * This implementation is NOT THREAD-SAFE and must not
131  * be called concurrently.
132  */
ATTR_NONNULL()133 static int ATTR_NONNULL()
134 withinRatelimit(ratelimit_t *__restrict__ const ratelimit,
135 	time_t tt,
136 	const char*const appname)
137 {
138 	int ret;
139 	uchar msgbuf[1024];
140 
141 	if(ratelimit->bThreadSafe) {
142 		pthread_mutex_lock(&ratelimit->mut);
143 	}
144 
145 	if(ratelimit->interval == 0) {
146 		ret = 1;
147 		goto finalize_it;
148 	}
149 
150 	/* we primarily need "NoTimeCache" mode for imjournal, as it
151 	 * sets the message generation time to the journal timestamp.
152 	 * As such, we do not get a proper indication of the actual
153 	 * message rate. To prevent this, we need to query local
154 	 * system time ourselvs.
155 	 */
156 	if(ratelimit->bNoTimeCache)
157 		tt = time(NULL);
158 
159 	assert(ratelimit->burst != 0);
160 
161 	if(ratelimit->begin == 0)
162 		ratelimit->begin = tt;
163 
164 	/* resume if we go out of time window or if time has gone backwards */
165 	if((tt > (time_t)(ratelimit->begin + ratelimit->interval)) || (tt < ratelimit->begin) ) {
166 		ratelimit->begin = 0;
167 		ratelimit->done = 0;
168 		tellLostCnt(ratelimit);
169 	}
170 
171 	/* do actual limit check */
172 	if(ratelimit->burst > ratelimit->done) {
173 		ratelimit->done++;
174 		ret = 1;
175 	} else {
176 		ratelimit->missed++;
177 		if(ratelimit->missed == 1) {
178 			snprintf((char*)msgbuf, sizeof(msgbuf),
179 				"%s from <%s>: begin to drop messages due to rate-limiting",
180 				ratelimit->name, appname);
181 			logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
182 		}
183 		ret = 0;
184 	}
185 
186 finalize_it:
187 	if(ratelimit->bThreadSafe) {
188 		pthread_mutex_unlock(&ratelimit->mut);
189 	}
190 	return ret;
191 }
192 
193 /* ratelimit a message based on message count
194  * - handles only rate-limiting
195  * This function returns RS_RET_OK, if the caller shall process
196  * the message regularly and RS_RET_DISCARD if the caller must
197  * discard the message. The caller should also discard the message
198  * if another return status occurs.
199  */
200 rsRetVal
ratelimitMsgCount(ratelimit_t * __restrict__ const ratelimit,time_t tt,const char * const appname)201 ratelimitMsgCount(ratelimit_t *__restrict__ const ratelimit,
202 	time_t tt,
203 	const char* const appname)
204 {
205 	DEFiRet;
206 	if(ratelimit->interval) {
207 		if(withinRatelimit(ratelimit, tt, appname) == 0) {
208 			ABORT_FINALIZE(RS_RET_DISCARDMSG);
209 		}
210 	}
211 finalize_it:
212 	if(Debug) {
213 		if(iRet == RS_RET_DISCARDMSG)
214 			DBGPRINTF("message discarded by ratelimiting\n");
215 	}
216 	RETiRet;
217 }
218 
219 /* ratelimit a message, that means:
220  * - handle "last message repeated n times" logic
221  * - handle actual (discarding) rate-limiting
222  * This function returns RS_RET_OK, if the caller shall process
223  * the message regularly and RS_RET_DISCARD if the caller must
224  * discard the message. The caller should also discard the message
225  * if another return status occurs. This places some burden on the
226  * caller logic, but provides best performance. Demanding this
227  * cooperative mode can enable a faulty caller to thrash up part
228  * of the system, but we accept that risk (a faulty caller can
229  * always do all sorts of evil, so...)
230  * If *ppRepMsg != NULL on return, the caller must enqueue that
231  * message before the original message.
232  */
233 rsRetVal
ratelimitMsg(ratelimit_t * __restrict__ const ratelimit,smsg_t * pMsg,smsg_t ** ppRepMsg)234 ratelimitMsg(ratelimit_t *__restrict__ const ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
235 {
236 	DEFiRet;
237 	rsRetVal localRet;
238 	int severity = 0;
239 
240 	*ppRepMsg = NULL;
241 
242 	if(runConf->globals.bReduceRepeatMsgs || ratelimit->severity > 0) {
243 		/* consider early parsing only if really needed */
244 		if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
245 			if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK)  {
246 				DBGPRINTF("Message discarded, parsing error %d\n", localRet);
247 				ABORT_FINALIZE(RS_RET_DISCARDMSG);
248 			}
249 		}
250 		severity = pMsg->iSeverity;
251 	}
252 
253 	/* Only the messages having severity level at or below the
254 	 * treshold (the value is >=) are subject to ratelimiting. */
255 	if(ratelimit->interval && (severity >= ratelimit->severity)) {
256 		char namebuf[512]; /* 256 for FGDN adn 256 for APPNAME should be enough */
257 		snprintf(namebuf, sizeof namebuf, "%s:%s", getHOSTNAME(pMsg),
258 			getAPPNAME(pMsg, 0));
259 		if(withinRatelimit(ratelimit, pMsg->ttGenTime, namebuf) == 0) {
260 			msgDestruct(&pMsg);
261 			ABORT_FINALIZE(RS_RET_DISCARDMSG);
262 		}
263 	}
264 	if(runConf->globals.bReduceRepeatMsgs) {
265 		CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
266 	}
267 finalize_it:
268 	if(Debug) {
269 		if(iRet == RS_RET_DISCARDMSG)
270 			DBGPRINTF("message discarded by ratelimiting\n");
271 	}
272 	RETiRet;
273 }
274 
275 /* returns 1, if the ratelimiter performs any checks and 0 otherwise */
276 int
ratelimitChecked(ratelimit_t * ratelimit)277 ratelimitChecked(ratelimit_t *ratelimit)
278 {
279 	return ratelimit->interval || runConf->globals.bReduceRepeatMsgs;
280 }
281 
282 
283 /* add a message to a ratelimiter/multisubmit structure.
284  * ratelimiting is automatically handled according to the ratelimit
285  * settings.
286  * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
287  */
288 rsRetVal
ratelimitAddMsg(ratelimit_t * ratelimit,multi_submit_t * pMultiSub,smsg_t * pMsg)289 ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, smsg_t *pMsg)
290 {
291 	rsRetVal localRet;
292 	smsg_t *repMsg;
293 	DEFiRet;
294 
295 	localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
296 	if(pMultiSub == NULL) {
297 		if(repMsg != NULL)
298 			CHKiRet(submitMsg2(repMsg));
299 		CHKiRet(localRet);
300 		CHKiRet(submitMsg2(pMsg));
301 	} else {
302 		if(repMsg != NULL) {
303 			pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
304 			if(pMultiSub->nElem == pMultiSub->maxElem)
305 				CHKiRet(multiSubmitMsg2(pMultiSub));
306 		}
307 		CHKiRet(localRet);
308 		if(pMsg->iLenRawMsg > glblGetMaxLine()) {
309 			/* oversize message needs special processing. We keep
310 			 * at least the previous batch as batch...
311 			 */
312 			if(pMultiSub->nElem > 0) {
313 				CHKiRet(multiSubmitMsg2(pMultiSub));
314 			}
315 			CHKiRet(submitMsg2(pMsg));
316 			FINALIZE;
317 		}
318 		pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
319 		if(pMultiSub->nElem == pMultiSub->maxElem)
320 			CHKiRet(multiSubmitMsg2(pMultiSub));
321 	}
322 
323 finalize_it:
324 	RETiRet;
325 }
326 
327 
328 /* modname must be a static name (usually expected to be the module
329  * name and MUST be present. dynname may be NULL and can be used for
330  * dynamic information, e.g. PID or listener IP, ...
331  * Both values should be kept brief.
332  */
333 rsRetVal
ratelimitNew(ratelimit_t ** ppThis,const char * modname,const char * dynname)334 ratelimitNew(ratelimit_t **ppThis, const char *modname, const char *dynname)
335 {
336 	ratelimit_t *pThis;
337 	char namebuf[256];
338 	DEFiRet;
339 
340 	CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
341 	if(modname == NULL)
342 		modname ="*ERROR:MODULE NAME MISSING*";
343 
344 	if(dynname == NULL) {
345 		pThis->name = strdup(modname);
346 	} else {
347 		snprintf(namebuf, sizeof(namebuf), "%s[%s]",
348 			 modname, dynname);
349 		namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
350 		pThis->name = strdup(namebuf);
351 	}
352 	DBGPRINTF("ratelimit:%s:new ratelimiter\n", pThis->name);
353 	*ppThis = pThis;
354 finalize_it:
355 	RETiRet;
356 }
357 
358 
359 /* enable linux-like ratelimiting */
360 void
ratelimitSetLinuxLike(ratelimit_t * ratelimit,unsigned int interval,unsigned int burst)361 ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned int interval, unsigned int burst)
362 {
363 	ratelimit->interval = interval;
364 	ratelimit->burst = burst;
365 	ratelimit->done = 0;
366 	ratelimit->missed = 0;
367 	ratelimit->begin = 0;
368 }
369 
370 
371 /* enable thread-safe operations mode. This make sure that
372  * a single ratelimiter can be called from multiple threads. As
373  * this causes some overhead and is not always required, it needs
374  * to be explicitely enabled. This operation cannot be undone
375  * (think: why should one do that???)
376  */
377 void
ratelimitSetThreadSafe(ratelimit_t * ratelimit)378 ratelimitSetThreadSafe(ratelimit_t *ratelimit)
379 {
380 	ratelimit->bThreadSafe = 1;
381 	pthread_mutex_init(&ratelimit->mut, NULL);
382 }
383 void
ratelimitSetNoTimeCache(ratelimit_t * ratelimit)384 ratelimitSetNoTimeCache(ratelimit_t *ratelimit)
385 {
386 	ratelimit->bNoTimeCache = 1;
387 	pthread_mutex_init(&ratelimit->mut, NULL);
388 }
389 
390 /* Severity level determines which messages are subject to
391  * ratelimiting. Default (no value set) is all messages.
392  */
393 void
ratelimitSetSeverity(ratelimit_t * ratelimit,intTiny severity)394 ratelimitSetSeverity(ratelimit_t *ratelimit, intTiny severity)
395 {
396 	ratelimit->severity = severity;
397 }
398 
399 void
ratelimitDestruct(ratelimit_t * ratelimit)400 ratelimitDestruct(ratelimit_t *ratelimit)
401 {
402 	smsg_t *pMsg;
403 	if(ratelimit->pMsg != NULL) {
404 		if(ratelimit->nsupp > 0) {
405 			pMsg = ratelimitGenRepMsg(ratelimit);
406 			if(pMsg != NULL)
407 				submitMsg2(pMsg);
408 		}
409 		msgDestruct(&ratelimit->pMsg);
410 	}
411 	tellLostCnt(ratelimit);
412 	if(ratelimit->bThreadSafe)
413 		pthread_mutex_destroy(&ratelimit->mut);
414 	free(ratelimit->name);
415 	free(ratelimit);
416 }
417 
418 void
ratelimitModExit(void)419 ratelimitModExit(void)
420 {
421 	objRelease(datetime, CORE_COMPONENT);
422 	objRelease(glbl, CORE_COMPONENT);
423 	objRelease(parser, CORE_COMPONENT);
424 }
425 
426 rsRetVal
ratelimitModInit(void)427 ratelimitModInit(void)
428 {
429 	DEFiRet;
430 	CHKiRet(objGetObjInterface(&obj));
431 	CHKiRet(objUse(glbl, CORE_COMPONENT));
432 	CHKiRet(objUse(datetime, CORE_COMPONENT));
433 	CHKiRet(objUse(parser, CORE_COMPONENT));
434 finalize_it:
435 	RETiRet;
436 }
437