1 /* ratelimit.c
2 * support for rate-limiting sources, including "last message
3 * repeated n times" processing.
4 *
5 * Copyright 2012-2020 Rainer Gerhards and Adiscon GmbH.
6 *
7 * This file is part of the rsyslog runtime library.
8 *
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 *
13 * http://www.apache.org/licenses/LICENSE-2.0
14 * -or-
15 * see COPYING.ASL20 in the source distribution
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23 #include "config.h"
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27
28 #include "rsyslog.h"
29 #include "errmsg.h"
30 #include "ratelimit.h"
31 #include "datetime.h"
32 #include "parser.h"
33 #include "unicode-helper.h"
34 #include "msg.h"
35 #include "rsconf.h"
36 #include "dirty.h"
37
38 /* definitions for objects we access */
39 DEFobjStaticHelpers
DEFobjCurrIf(glbl)40 DEFobjCurrIf(glbl)
41 DEFobjCurrIf(datetime)
42 DEFobjCurrIf(parser)
43
44 /* static data */
45
46 /* generate a "repeated n times" message */
47 static smsg_t *
48 ratelimitGenRepMsg(ratelimit_t *ratelimit)
49 {
50 smsg_t *repMsg;
51 size_t lenRepMsg;
52 uchar szRepMsg[1024];
53
54 if(ratelimit->nsupp == 1) { /* we simply use the original message! */
55 repMsg = MsgAddRef(ratelimit->pMsg);
56 } else {/* we need to duplicate, original message may still be in use in other
57 * parts of the system! */
58 if((repMsg = MsgDup(ratelimit->pMsg)) == NULL) {
59 DBGPRINTF("Message duplication failed, dropping repeat message.\n");
60 goto done;
61 }
62 lenRepMsg = snprintf((char*)szRepMsg, sizeof(szRepMsg),
63 " message repeated %d times: [%.800s]",
64 ratelimit->nsupp, getMSG(ratelimit->pMsg));
65 MsgReplaceMSG(repMsg, szRepMsg, lenRepMsg);
66 }
67
68 done: return repMsg;
69 }
70
71 static rsRetVal
doLastMessageRepeatedNTimes(ratelimit_t * ratelimit,smsg_t * pMsg,smsg_t ** ppRepMsg)72 doLastMessageRepeatedNTimes(ratelimit_t *ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
73 {
74 int bNeedUnlockMutex = 0;
75 DEFiRet;
76
77 if(ratelimit->bThreadSafe) {
78 pthread_mutex_lock(&ratelimit->mut);
79 bNeedUnlockMutex = 1;
80 }
81
82 if( ratelimit->pMsg != NULL &&
83 getMSGLen(pMsg) == getMSGLen(ratelimit->pMsg) &&
84 !ustrcmp(getMSG(pMsg), getMSG(ratelimit->pMsg)) &&
85 !strcmp(getHOSTNAME(pMsg), getHOSTNAME(ratelimit->pMsg)) &&
86 !strcmp(getPROCID(pMsg, LOCK_MUTEX), getPROCID(ratelimit->pMsg, LOCK_MUTEX)) &&
87 !strcmp(getAPPNAME(pMsg, LOCK_MUTEX), getAPPNAME(ratelimit->pMsg, LOCK_MUTEX))) {
88 ratelimit->nsupp++;
89 DBGPRINTF("msg repeated %d times\n", ratelimit->nsupp);
90 /* use current message, so we have the new timestamp
91 * (means we need to discard previous one) */
92 msgDestruct(&ratelimit->pMsg);
93 ratelimit->pMsg = pMsg;
94 ABORT_FINALIZE(RS_RET_DISCARDMSG);
95 } else {/* new message, do "repeat processing" & save it */
96 if(ratelimit->pMsg != NULL) {
97 if(ratelimit->nsupp > 0) {
98 *ppRepMsg = ratelimitGenRepMsg(ratelimit);
99 ratelimit->nsupp = 0;
100 }
101 msgDestruct(&ratelimit->pMsg);
102 }
103 ratelimit->pMsg = MsgAddRef(pMsg);
104 }
105
106 finalize_it:
107 if(bNeedUnlockMutex)
108 pthread_mutex_unlock(&ratelimit->mut);
109 RETiRet;
110 }
111
112
113 /* helper: tell how many messages we lost due to linux-like ratelimiting */
114 static void
tellLostCnt(ratelimit_t * ratelimit)115 tellLostCnt(ratelimit_t *ratelimit)
116 {
117 uchar msgbuf[1024];
118 if(ratelimit->missed) {
119 snprintf((char*)msgbuf, sizeof(msgbuf),
120 "%s: %u messages lost due to rate-limiting (%u allowed within %u seconds)",
121 ratelimit->name, ratelimit->missed, ratelimit->burst, ratelimit->interval);
122 ratelimit->missed = 0;
123 logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
124 }
125 }
126
127 /* Linux-like ratelimiting, modelled after the linux kernel
128 * returns 1 if message is within rate limit and shall be
129 * processed, 0 otherwise.
130 * This implementation is NOT THREAD-SAFE and must not
131 * be called concurrently.
132 */
ATTR_NONNULL()133 static int ATTR_NONNULL()
134 withinRatelimit(ratelimit_t *__restrict__ const ratelimit,
135 time_t tt,
136 const char*const appname)
137 {
138 int ret;
139 uchar msgbuf[1024];
140
141 if(ratelimit->bThreadSafe) {
142 pthread_mutex_lock(&ratelimit->mut);
143 }
144
145 if(ratelimit->interval == 0) {
146 ret = 1;
147 goto finalize_it;
148 }
149
150 /* we primarily need "NoTimeCache" mode for imjournal, as it
151 * sets the message generation time to the journal timestamp.
152 * As such, we do not get a proper indication of the actual
153 * message rate. To prevent this, we need to query local
154 * system time ourselvs.
155 */
156 if(ratelimit->bNoTimeCache)
157 tt = time(NULL);
158
159 assert(ratelimit->burst != 0);
160
161 if(ratelimit->begin == 0)
162 ratelimit->begin = tt;
163
164 /* resume if we go out of time window or if time has gone backwards */
165 if((tt > (time_t)(ratelimit->begin + ratelimit->interval)) || (tt < ratelimit->begin) ) {
166 ratelimit->begin = 0;
167 ratelimit->done = 0;
168 tellLostCnt(ratelimit);
169 }
170
171 /* do actual limit check */
172 if(ratelimit->burst > ratelimit->done) {
173 ratelimit->done++;
174 ret = 1;
175 } else {
176 ratelimit->missed++;
177 if(ratelimit->missed == 1) {
178 snprintf((char*)msgbuf, sizeof(msgbuf),
179 "%s from <%s>: begin to drop messages due to rate-limiting",
180 ratelimit->name, appname);
181 logmsgInternal(RS_RET_RATE_LIMITED, LOG_SYSLOG|LOG_INFO, msgbuf, 0);
182 }
183 ret = 0;
184 }
185
186 finalize_it:
187 if(ratelimit->bThreadSafe) {
188 pthread_mutex_unlock(&ratelimit->mut);
189 }
190 return ret;
191 }
192
193 /* ratelimit a message based on message count
194 * - handles only rate-limiting
195 * This function returns RS_RET_OK, if the caller shall process
196 * the message regularly and RS_RET_DISCARD if the caller must
197 * discard the message. The caller should also discard the message
198 * if another return status occurs.
199 */
200 rsRetVal
ratelimitMsgCount(ratelimit_t * __restrict__ const ratelimit,time_t tt,const char * const appname)201 ratelimitMsgCount(ratelimit_t *__restrict__ const ratelimit,
202 time_t tt,
203 const char* const appname)
204 {
205 DEFiRet;
206 if(ratelimit->interval) {
207 if(withinRatelimit(ratelimit, tt, appname) == 0) {
208 ABORT_FINALIZE(RS_RET_DISCARDMSG);
209 }
210 }
211 finalize_it:
212 if(Debug) {
213 if(iRet == RS_RET_DISCARDMSG)
214 DBGPRINTF("message discarded by ratelimiting\n");
215 }
216 RETiRet;
217 }
218
219 /* ratelimit a message, that means:
220 * - handle "last message repeated n times" logic
221 * - handle actual (discarding) rate-limiting
222 * This function returns RS_RET_OK, if the caller shall process
223 * the message regularly and RS_RET_DISCARD if the caller must
224 * discard the message. The caller should also discard the message
225 * if another return status occurs. This places some burden on the
226 * caller logic, but provides best performance. Demanding this
227 * cooperative mode can enable a faulty caller to thrash up part
228 * of the system, but we accept that risk (a faulty caller can
229 * always do all sorts of evil, so...)
230 * If *ppRepMsg != NULL on return, the caller must enqueue that
231 * message before the original message.
232 */
233 rsRetVal
ratelimitMsg(ratelimit_t * __restrict__ const ratelimit,smsg_t * pMsg,smsg_t ** ppRepMsg)234 ratelimitMsg(ratelimit_t *__restrict__ const ratelimit, smsg_t *pMsg, smsg_t **ppRepMsg)
235 {
236 DEFiRet;
237 rsRetVal localRet;
238 int severity = 0;
239
240 *ppRepMsg = NULL;
241
242 if(runConf->globals.bReduceRepeatMsgs || ratelimit->severity > 0) {
243 /* consider early parsing only if really needed */
244 if((pMsg->msgFlags & NEEDS_PARSING) != 0) {
245 if((localRet = parser.ParseMsg(pMsg)) != RS_RET_OK) {
246 DBGPRINTF("Message discarded, parsing error %d\n", localRet);
247 ABORT_FINALIZE(RS_RET_DISCARDMSG);
248 }
249 }
250 severity = pMsg->iSeverity;
251 }
252
253 /* Only the messages having severity level at or below the
254 * treshold (the value is >=) are subject to ratelimiting. */
255 if(ratelimit->interval && (severity >= ratelimit->severity)) {
256 char namebuf[512]; /* 256 for FGDN adn 256 for APPNAME should be enough */
257 snprintf(namebuf, sizeof namebuf, "%s:%s", getHOSTNAME(pMsg),
258 getAPPNAME(pMsg, 0));
259 if(withinRatelimit(ratelimit, pMsg->ttGenTime, namebuf) == 0) {
260 msgDestruct(&pMsg);
261 ABORT_FINALIZE(RS_RET_DISCARDMSG);
262 }
263 }
264 if(runConf->globals.bReduceRepeatMsgs) {
265 CHKiRet(doLastMessageRepeatedNTimes(ratelimit, pMsg, ppRepMsg));
266 }
267 finalize_it:
268 if(Debug) {
269 if(iRet == RS_RET_DISCARDMSG)
270 DBGPRINTF("message discarded by ratelimiting\n");
271 }
272 RETiRet;
273 }
274
275 /* returns 1, if the ratelimiter performs any checks and 0 otherwise */
276 int
ratelimitChecked(ratelimit_t * ratelimit)277 ratelimitChecked(ratelimit_t *ratelimit)
278 {
279 return ratelimit->interval || runConf->globals.bReduceRepeatMsgs;
280 }
281
282
283 /* add a message to a ratelimiter/multisubmit structure.
284 * ratelimiting is automatically handled according to the ratelimit
285 * settings.
286 * if pMultiSub == NULL, a single-message enqueue happens (under reconsideration)
287 */
288 rsRetVal
ratelimitAddMsg(ratelimit_t * ratelimit,multi_submit_t * pMultiSub,smsg_t * pMsg)289 ratelimitAddMsg(ratelimit_t *ratelimit, multi_submit_t *pMultiSub, smsg_t *pMsg)
290 {
291 rsRetVal localRet;
292 smsg_t *repMsg;
293 DEFiRet;
294
295 localRet = ratelimitMsg(ratelimit, pMsg, &repMsg);
296 if(pMultiSub == NULL) {
297 if(repMsg != NULL)
298 CHKiRet(submitMsg2(repMsg));
299 CHKiRet(localRet);
300 CHKiRet(submitMsg2(pMsg));
301 } else {
302 if(repMsg != NULL) {
303 pMultiSub->ppMsgs[pMultiSub->nElem++] = repMsg;
304 if(pMultiSub->nElem == pMultiSub->maxElem)
305 CHKiRet(multiSubmitMsg2(pMultiSub));
306 }
307 CHKiRet(localRet);
308 if(pMsg->iLenRawMsg > glblGetMaxLine()) {
309 /* oversize message needs special processing. We keep
310 * at least the previous batch as batch...
311 */
312 if(pMultiSub->nElem > 0) {
313 CHKiRet(multiSubmitMsg2(pMultiSub));
314 }
315 CHKiRet(submitMsg2(pMsg));
316 FINALIZE;
317 }
318 pMultiSub->ppMsgs[pMultiSub->nElem++] = pMsg;
319 if(pMultiSub->nElem == pMultiSub->maxElem)
320 CHKiRet(multiSubmitMsg2(pMultiSub));
321 }
322
323 finalize_it:
324 RETiRet;
325 }
326
327
328 /* modname must be a static name (usually expected to be the module
329 * name and MUST be present. dynname may be NULL and can be used for
330 * dynamic information, e.g. PID or listener IP, ...
331 * Both values should be kept brief.
332 */
333 rsRetVal
ratelimitNew(ratelimit_t ** ppThis,const char * modname,const char * dynname)334 ratelimitNew(ratelimit_t **ppThis, const char *modname, const char *dynname)
335 {
336 ratelimit_t *pThis;
337 char namebuf[256];
338 DEFiRet;
339
340 CHKmalloc(pThis = calloc(1, sizeof(ratelimit_t)));
341 if(modname == NULL)
342 modname ="*ERROR:MODULE NAME MISSING*";
343
344 if(dynname == NULL) {
345 pThis->name = strdup(modname);
346 } else {
347 snprintf(namebuf, sizeof(namebuf), "%s[%s]",
348 modname, dynname);
349 namebuf[sizeof(namebuf)-1] = '\0'; /* to be on safe side */
350 pThis->name = strdup(namebuf);
351 }
352 DBGPRINTF("ratelimit:%s:new ratelimiter\n", pThis->name);
353 *ppThis = pThis;
354 finalize_it:
355 RETiRet;
356 }
357
358
359 /* enable linux-like ratelimiting */
360 void
ratelimitSetLinuxLike(ratelimit_t * ratelimit,unsigned int interval,unsigned int burst)361 ratelimitSetLinuxLike(ratelimit_t *ratelimit, unsigned int interval, unsigned int burst)
362 {
363 ratelimit->interval = interval;
364 ratelimit->burst = burst;
365 ratelimit->done = 0;
366 ratelimit->missed = 0;
367 ratelimit->begin = 0;
368 }
369
370
371 /* enable thread-safe operations mode. This make sure that
372 * a single ratelimiter can be called from multiple threads. As
373 * this causes some overhead and is not always required, it needs
374 * to be explicitely enabled. This operation cannot be undone
375 * (think: why should one do that???)
376 */
377 void
ratelimitSetThreadSafe(ratelimit_t * ratelimit)378 ratelimitSetThreadSafe(ratelimit_t *ratelimit)
379 {
380 ratelimit->bThreadSafe = 1;
381 pthread_mutex_init(&ratelimit->mut, NULL);
382 }
383 void
ratelimitSetNoTimeCache(ratelimit_t * ratelimit)384 ratelimitSetNoTimeCache(ratelimit_t *ratelimit)
385 {
386 ratelimit->bNoTimeCache = 1;
387 pthread_mutex_init(&ratelimit->mut, NULL);
388 }
389
390 /* Severity level determines which messages are subject to
391 * ratelimiting. Default (no value set) is all messages.
392 */
393 void
ratelimitSetSeverity(ratelimit_t * ratelimit,intTiny severity)394 ratelimitSetSeverity(ratelimit_t *ratelimit, intTiny severity)
395 {
396 ratelimit->severity = severity;
397 }
398
399 void
ratelimitDestruct(ratelimit_t * ratelimit)400 ratelimitDestruct(ratelimit_t *ratelimit)
401 {
402 smsg_t *pMsg;
403 if(ratelimit->pMsg != NULL) {
404 if(ratelimit->nsupp > 0) {
405 pMsg = ratelimitGenRepMsg(ratelimit);
406 if(pMsg != NULL)
407 submitMsg2(pMsg);
408 }
409 msgDestruct(&ratelimit->pMsg);
410 }
411 tellLostCnt(ratelimit);
412 if(ratelimit->bThreadSafe)
413 pthread_mutex_destroy(&ratelimit->mut);
414 free(ratelimit->name);
415 free(ratelimit);
416 }
417
418 void
ratelimitModExit(void)419 ratelimitModExit(void)
420 {
421 objRelease(datetime, CORE_COMPONENT);
422 objRelease(glbl, CORE_COMPONENT);
423 objRelease(parser, CORE_COMPONENT);
424 }
425
426 rsRetVal
ratelimitModInit(void)427 ratelimitModInit(void)
428 {
429 DEFiRet;
430 CHKiRet(objGetObjInterface(&obj));
431 CHKiRet(objUse(glbl, CORE_COMPONENT));
432 CHKiRet(objUse(datetime, CORE_COMPONENT));
433 CHKiRet(objUse(parser, CORE_COMPONENT));
434 finalize_it:
435 RETiRet;
436 }
437