1 /* tcpsrv.c
2 *
3 * Common code for plain TCP syslog based servers. This is currently being
4 * utilized by imtcp and imgssapi.
5 *
6 * NOTE: this is *not* a generic TCP server, but one for syslog servers. For
7 * generic stream servers, please use ./runtime/strmsrv.c!
8 *
9 * There are actually two classes within the tcpserver code: one is
10 * the tcpsrv itself, the other one is its sessions. This is a helper
11 * class to tcpsrv.
12 *
13 * The common code here calls upon specific functionality by using
14 * callbacks. The specialised input modules need to set the proper
15 * callbacks before the code is run. The tcpsrv then calls back
16 * into the specific input modules at the appropriate time.
17 *
18 * NOTE: read comments in module-template.h to understand how this file
19 * works!
20 *
21 * File begun on 2007-12-21 by RGerhards (extracted from syslogd.c[which was
22 * licensed under BSD at the time of the rsyslog fork])
23 *
24 * Copyright 2007-2021 Adiscon GmbH.
25 *
26 * This file is part of rsyslog.
27 *
28 * Licensed under the Apache License, Version 2.0 (the "License");
29 * you may not use this file except in compliance with the License.
30 * You may obtain a copy of the License at
31 *
32 * http://www.apache.org/licenses/LICENSE-2.0
33 * -or-
34 * see COPYING.ASL20 in the source distribution
35 *
36 * Unless required by applicable law or agreed to in writing, software
37 * distributed under the License is distributed on an "AS IS" BASIS,
38 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
39 * See the License for the specific language governing permissions and
40 * limitations under the License.
41 */
42 #include "config.h"
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <assert.h>
46 #include <string.h>
47 #include <errno.h>
48 #include <unistd.h>
49 #include <stdarg.h>
50 #include <ctype.h>
51 #include <netinet/in.h>
52 #include <netdb.h>
53 #include <pthread.h>
54 #include <sys/types.h>
55 #include <signal.h>
56 #include <sys/socket.h>
57 #if HAVE_FCNTL_H
58 #include <fcntl.h>
59 #endif
60 #include "rsyslog.h"
61 #include "dirty.h"
62 #include "cfsysline.h"
63 #include "module-template.h"
64 #include "net.h"
65 #include "srUtils.h"
66 #include "conf.h"
67 #include "tcpsrv.h"
68 #include "obj.h"
69 #include "glbl.h"
70 #include "netstrms.h"
71 #include "netstrm.h"
72 #include "nssel.h"
73 #include "nspoll.h"
74 #include "errmsg.h"
75 #include "ruleset.h"
76 #include "ratelimit.h"
77 #include "unicode-helper.h"
78
79 PRAGMA_INGORE_Wswitch_enum
80 MODULE_TYPE_LIB
81 MODULE_TYPE_NOKEEP
82
83 /* defines */
84 #define TCPSESS_MAX_DEFAULT 200 /* default for nbr of tcp sessions if no number is given */
85 #define TCPLSTN_MAX_DEFAULT 20 /* default for nbr of listeners */
86
87 /* static data */
88 DEFobjStaticHelpers
89 DEFobjCurrIf(conf)
90 DEFobjCurrIf(glbl)
91 DEFobjCurrIf(ruleset)
92 DEFobjCurrIf(tcps_sess)
93 DEFobjCurrIf(net)
94 DEFobjCurrIf(netstrms)
95 DEFobjCurrIf(netstrm)
96 DEFobjCurrIf(nssel)
97 DEFobjCurrIf(nspoll)
98 DEFobjCurrIf(prop)
99 DEFobjCurrIf(statsobj)
100
101 static void startWorkerPool(void);
102
103 /* The following structure controls the worker threads. Global data is
104 * needed for their access.
105 */
106 static struct wrkrInfo_s {
107 pthread_t tid; /* the worker's thread ID */
108 pthread_cond_t run;
109 int idx;
110 tcpsrv_t *pSrv; /* pSrv == NULL -> idle */
111 nspoll_t *pPoll;
112 void *pUsr;
113 sbool enabled;
114 long long unsigned numCalled; /* how often was this called */
115 } wrkrInfo[4];
116 static sbool bWrkrRunning; /* are the worker threads running? */
117 static pthread_mutex_t wrkrMut;
118 static pthread_cond_t wrkrIdle;
119 static int wrkrMax = 4;
120 static int wrkrRunning;
121
122 /* add new listener port to listener port list
123 * rgerhards, 2009-05-21
124 */
125 static rsRetVal ATTR_NONNULL(1, 2)
addNewLstnPort(tcpsrv_t * const pThis,tcpLstnParams_t * const cnf_params)126 addNewLstnPort(tcpsrv_t *const pThis, tcpLstnParams_t *const cnf_params)
127 {
128 tcpLstnPortList_t *pEntry;
129 uchar statname[64];
130 DEFiRet;
131
132 ISOBJ_TYPE_assert(pThis, tcpsrv);
133
134 /* create entry */
135 CHKmalloc(pEntry = (tcpLstnPortList_t*)calloc(1, sizeof(tcpLstnPortList_t)));
136 pEntry->cnf_params = cnf_params;
137
138 strcpy((char*)pEntry->cnf_params->dfltTZ, (char*)pThis->dfltTZ);
139 pEntry->cnf_params->bSPFramingFix = pThis->bSPFramingFix;
140 pEntry->cnf_params->bPreserveCase = pThis->bPreserveCase;
141 pEntry->pSrv = pThis;
142
143
144 /* support statistics gathering */
145 CHKiRet(ratelimitNew(&pEntry->ratelimiter, "tcperver", NULL));
146 ratelimitSetLinuxLike(pEntry->ratelimiter, pThis->ratelimitInterval, pThis->ratelimitBurst);
147 ratelimitSetThreadSafe(pEntry->ratelimiter);
148
149 CHKiRet(statsobj.Construct(&(pEntry->stats)));
150 snprintf((char*)statname, sizeof(statname), "%s(%s)", cnf_params->pszInputName, cnf_params->pszPort);
151 statname[sizeof(statname)-1] = '\0'; /* just to be on the save side... */
152 CHKiRet(statsobj.SetName(pEntry->stats, statname));
153 CHKiRet(statsobj.SetOrigin(pEntry->stats, pThis->pszOrigin));
154 STATSCOUNTER_INIT(pEntry->ctrSubmit, pEntry->mutCtrSubmit);
155 CHKiRet(statsobj.AddCounter(pEntry->stats, UCHAR_CONSTANT("submitted"),
156 ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pEntry->ctrSubmit)));
157 CHKiRet(statsobj.ConstructFinalize(pEntry->stats));
158
159 /* all OK - add to list */
160 pEntry->pNext = pThis->pLstnPorts;
161 pThis->pLstnPorts = pEntry;
162
163 finalize_it:
164 if(iRet != RS_RET_OK) {
165 if(pEntry != NULL) {
166 if(pEntry->cnf_params->pInputName != NULL) {
167 prop.Destruct(&pEntry->cnf_params->pInputName);
168 }
169 if(pEntry->ratelimiter != NULL) {
170 ratelimitDestruct(pEntry->ratelimiter);
171 }
172 if(pEntry->stats != NULL) {
173 statsobj.Destruct(&pEntry->stats);
174 }
175 free(pEntry);
176 }
177 }
178
179 RETiRet;
180 }
181
182
183 /* configure TCP listener settings.
184 * Note: pszPort is handed over to us - the caller MUST NOT free it!
185 * rgerhards, 2008-03-20
186 */
187 static rsRetVal ATTR_NONNULL(1,2)
configureTCPListen(tcpsrv_t * const pThis,tcpLstnParams_t * const cnf_params)188 configureTCPListen(tcpsrv_t *const pThis, tcpLstnParams_t *const cnf_params)
189 {
190 assert(cnf_params->pszPort != NULL);
191 int i;
192 DEFiRet;
193
194 ISOBJ_TYPE_assert(pThis, tcpsrv);
195
196 /* extract port */
197 const uchar *pPort = cnf_params->pszPort;
198 i = 0;
199 while(isdigit((int) *pPort)) {
200 i = i * 10 + *pPort++ - '0';
201 }
202
203 if(i >= 0 && i <= 65535) {
204 CHKiRet(addNewLstnPort(pThis, cnf_params));
205 } else {
206 LogError(0, NO_ERRCODE, "Invalid TCP listen port %s - ignored.\n", cnf_params->pszPort);
207 }
208
209 finalize_it:
210 RETiRet;
211 }
212
213
214 /* Initialize the session table
215 * returns 0 if OK, somewhat else otherwise
216 */
217 static rsRetVal
TCPSessTblInit(tcpsrv_t * pThis)218 TCPSessTblInit(tcpsrv_t *pThis)
219 {
220 DEFiRet;
221
222 ISOBJ_TYPE_assert(pThis, tcpsrv);
223 assert(pThis->pSessions == NULL);
224
225 DBGPRINTF("Allocating buffer for %d TCP sessions.\n", pThis->iSessMax);
226 if((pThis->pSessions = (tcps_sess_t **) calloc(pThis->iSessMax, sizeof(tcps_sess_t *))) == NULL) {
227 DBGPRINTF("Error: TCPSessInit() could not alloc memory for TCP session table.\n");
228 ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
229 }
230
231 finalize_it:
232 RETiRet;
233 }
234
235
236 /* find a free spot in the session table. If the table
237 * is full, -1 is returned, else the index of the free
238 * entry (0 or higher).
239 */
240 static int
TCPSessTblFindFreeSpot(tcpsrv_t * pThis)241 TCPSessTblFindFreeSpot(tcpsrv_t *pThis)
242 {
243 register int i;
244
245 ISOBJ_TYPE_assert(pThis, tcpsrv);
246
247 for(i = 0 ; i < pThis->iSessMax ; ++i) {
248 if(pThis->pSessions[i] == NULL)
249 break;
250 }
251
252 return((i < pThis->iSessMax) ? i : -1);
253 }
254
255
256 /* Get the next session index. Free session tables entries are
257 * skipped. This function is provided the index of the last
258 * session entry, or -1 if no previous entry was obtained. It
259 * returns the index of the next session or -1, if there is no
260 * further entry in the table. Please note that the initial call
261 * might as well return -1, if there is no session at all in the
262 * session table.
263 */
264 static int
TCPSessGetNxtSess(tcpsrv_t * pThis,int iCurr)265 TCPSessGetNxtSess(tcpsrv_t *pThis, int iCurr)
266 {
267 register int i;
268
269 ISOBJ_TYPE_assert(pThis, tcpsrv);
270 assert(pThis->pSessions != NULL);
271 for(i = iCurr + 1 ; i < pThis->iSessMax ; ++i) {
272 if(pThis->pSessions[i] != NULL)
273 break;
274 }
275
276 return((i < pThis->iSessMax) ? i : -1);
277 }
278
279
280 /* De-Initialize TCP listner sockets.
281 * This function deinitializes everything, including freeing the
282 * session table. No TCP listen receive operations are permitted
283 * unless the subsystem is reinitialized.
284 * rgerhards, 2007-06-21
285 */
ATTR_NONNULL()286 static void ATTR_NONNULL()
287 deinit_tcp_listener(tcpsrv_t *const pThis)
288 {
289 int i;
290 tcpLstnPortList_t *pEntry;
291 tcpLstnPortList_t *pDel;
292
293 ISOBJ_TYPE_assert(pThis, tcpsrv);
294
295 if(pThis->pSessions != NULL) {
296 /* close all TCP connections! */
297 if(!pThis->bUsingEPoll) {
298 i = TCPSessGetNxtSess(pThis, -1);
299 while(i != -1) {
300 tcps_sess.Destruct(&pThis->pSessions[i]);
301 /* now get next... */
302 i = TCPSessGetNxtSess(pThis, i);
303 }
304 }
305
306 /* we are done with the session table - so get rid of it... */
307 free(pThis->pSessions);
308 pThis->pSessions = NULL; /* just to make sure... */
309 }
310
311 /* free list of tcp listen ports */
312 pEntry = pThis->pLstnPorts;
313 while(pEntry != NULL) {
314 prop.Destruct(&pEntry->cnf_params->pInputName);
315 free((void*)pEntry->cnf_params->pszInputName);
316 free((void*)pEntry->cnf_params->pszPort);
317 free((void*)pEntry->cnf_params->pszAddr);
318 free((void*)pEntry->cnf_params->pszLstnPortFileName);
319 free((void*)pEntry->cnf_params);
320 ratelimitDestruct(pEntry->ratelimiter);
321 statsobj.Destruct(&(pEntry->stats));
322 pDel = pEntry;
323 pEntry = pEntry->pNext;
324 free(pDel);
325 }
326
327 /* finally close our listen streams */
328 for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
329 netstrm.Destruct(pThis->ppLstn + i);
330 }
331 }
332
333
334 /* add a listen socket to our listen socket array. This is a callback
335 * invoked from the netstrm class. -- rgerhards, 2008-04-23
336 */
337 static rsRetVal
addTcpLstn(void * pUsr,netstrm_t * pLstn)338 addTcpLstn(void *pUsr, netstrm_t *pLstn)
339 {
340 tcpLstnPortList_t *pPortList = (tcpLstnPortList_t *) pUsr;
341 tcpsrv_t *pThis = pPortList->pSrv;
342 DEFiRet;
343
344 ISOBJ_TYPE_assert(pThis, tcpsrv);
345 ISOBJ_TYPE_assert(pLstn, netstrm);
346
347 if(pThis->iLstnCurr >= pThis->iLstnMax)
348 ABORT_FINALIZE(RS_RET_MAX_LSTN_REACHED);
349
350 pThis->ppLstn[pThis->iLstnCurr] = pLstn;
351 pThis->ppLstnPort[pThis->iLstnCurr] = pPortList;
352 ++pThis->iLstnCurr;
353
354 finalize_it:
355 RETiRet;
356 }
357
358
359 /* Initialize TCP listener socket for a single port
360 * Note: at this point, TLS vs. non-TLS does not matter; TLS params are
361 * set on connect!
362 * rgerhards, 2009-05-21
363 */
364 static rsRetVal
initTCPListener(tcpsrv_t * pThis,tcpLstnPortList_t * pPortEntry)365 initTCPListener(tcpsrv_t *pThis, tcpLstnPortList_t *pPortEntry)
366 {
367 DEFiRet;
368
369 ISOBJ_TYPE_assert(pThis, tcpsrv);
370 assert(pPortEntry != NULL);
371
372 // pPortEntry->pszAddr = NULL ==> bind to all interfaces
373 CHKiRet(netstrm.LstnInit(pThis->pNS, (void*)pPortEntry, addTcpLstn,
374 pThis->iSessMax, pPortEntry->cnf_params));
375
376 finalize_it:
377 RETiRet;
378 }
379
380
381 /* Initialize TCP sockets (for listener) and listens on them */
382 static rsRetVal
create_tcp_socket(tcpsrv_t * pThis)383 create_tcp_socket(tcpsrv_t *pThis)
384 {
385 DEFiRet;
386 rsRetVal localRet;
387 tcpLstnPortList_t *pEntry;
388
389 ISOBJ_TYPE_assert(pThis, tcpsrv);
390
391 /* init all configured ports */
392 pEntry = pThis->pLstnPorts;
393 while(pEntry != NULL) {
394 localRet = initTCPListener(pThis, pEntry);
395 if(localRet != RS_RET_OK) {
396 LogError(0, localRet, "Could not create tcp listener, ignoring port "
397 "%s bind-address %s.",
398 (pEntry->cnf_params->pszPort == NULL) ? "**UNSPECIFIED**"
399 : (const char*) pEntry->cnf_params->pszPort,
400 (pEntry->cnf_params->pszAddr == NULL) ? "**UNSPECIFIED**"
401 : (const char*)pEntry->cnf_params->pszAddr);
402 }
403 pEntry = pEntry->pNext;
404 }
405
406 /* OK, we had success. Now it is also time to
407 * initialize our connections
408 */
409 if(TCPSessTblInit(pThis) != 0) {
410 /* OK, we are in some trouble - we could not initialize the
411 * session table, so we can not continue. We need to free all
412 * we have assigned so far, because we can not really use it...
413 */
414 LogError(0, RS_RET_ERR, "Could not initialize TCP session table, suspending TCP "
415 "message reception.");
416 ABORT_FINALIZE(RS_RET_ERR);
417 }
418
419 finalize_it:
420 RETiRet;
421 }
422
423
424 /* Accept new TCP connection; make entry in session table. If there
425 * is no more space left in the connection table, the new TCP
426 * connection is immediately dropped.
427 * ppSess has a pointer to the newly created session, if it succeeds.
428 * If it does not succeed, no session is created and ppSess is
429 * undefined. If the user has provided an OnSessAccept Callback,
430 * this one is executed immediately after creation of the
431 * session object, so that it can do its own initialization.
432 * rgerhards, 2008-03-02
433 */
434 static rsRetVal
SessAccept(tcpsrv_t * pThis,tcpLstnPortList_t * pLstnInfo,tcps_sess_t ** ppSess,netstrm_t * pStrm)435 SessAccept(tcpsrv_t *pThis, tcpLstnPortList_t *pLstnInfo, tcps_sess_t **ppSess, netstrm_t *pStrm)
436 {
437 DEFiRet;
438 tcps_sess_t *pSess = NULL;
439 netstrm_t *pNewStrm = NULL;
440 const tcpLstnParams_t *const cnf_params = pLstnInfo->cnf_params;
441 int iSess = -1;
442 struct sockaddr_storage *addr;
443 uchar *fromHostFQDN = NULL;
444 prop_t *fromHostIP;
445
446 ISOBJ_TYPE_assert(pThis, tcpsrv);
447 assert(pLstnInfo != NULL);
448
449 CHKiRet(netstrm.AcceptConnReq(pStrm, &pNewStrm));
450
451 /* Add to session list */
452 iSess = TCPSessTblFindFreeSpot(pThis);
453 if(iSess == -1) {
454 errno = 0;
455 LogError(0, RS_RET_MAX_SESS_REACHED, "too many tcp sessions - dropping incoming request");
456 ABORT_FINALIZE(RS_RET_MAX_SESS_REACHED);
457 }
458
459 if(pThis->bUseKeepAlive) {
460 CHKiRet(netstrm.SetKeepAliveProbes(pNewStrm, pThis->iKeepAliveProbes));
461 CHKiRet(netstrm.SetKeepAliveTime(pNewStrm, pThis->iKeepAliveTime));
462 CHKiRet(netstrm.SetKeepAliveIntvl(pNewStrm, pThis->iKeepAliveIntvl));
463 CHKiRet(netstrm.EnableKeepAlive(pNewStrm));
464 }
465
466 /* we found a free spot and can construct our session object */
467 if(pThis->gnutlsPriorityString != NULL) {
468 CHKiRet(netstrm.SetGnutlsPriorityString(pNewStrm, pThis->gnutlsPriorityString));
469 }
470 CHKiRet(tcps_sess.Construct(&pSess));
471 CHKiRet(tcps_sess.SetTcpsrv(pSess, pThis));
472 CHKiRet(tcps_sess.SetLstnInfo(pSess, pLstnInfo));
473 if(pThis->OnMsgReceive != NULL)
474 CHKiRet(tcps_sess.SetOnMsgReceive(pSess, pThis->OnMsgReceive));
475
476 /* get the host name */
477 CHKiRet(netstrm.GetRemoteHName(pNewStrm, &fromHostFQDN));
478 if (!cnf_params->bPreserveCase) {
479 /* preserve_case = off */
480 uchar *p;
481 for(p = fromHostFQDN; *p; p++) {
482 if (isupper((int) *p)) {
483 *p = tolower((int) *p);
484 }
485 }
486 }
487 CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP));
488 CHKiRet(netstrm.GetRemAddr(pNewStrm, &addr));
489 /* TODO: check if we need to strip the domain name here -- rgerhards, 2008-04-24 */
490
491 /* Here we check if a host is permitted to send us messages. If it isn't, we do not further
492 * process the message but log a warning (if we are configured to do this).
493 * rgerhards, 2005-09-26
494 */
495 if(!pThis->pIsPermittedHost((struct sockaddr*) addr, (char*) fromHostFQDN, pThis->pUsr, pSess->pUsr)) {
496 DBGPRINTF("%s is not an allowed sender\n", fromHostFQDN);
497 if(glbl.GetOption_DisallowWarning()) {
498 errno = 0;
499 LogError(0, RS_RET_HOST_NOT_PERMITTED, "TCP message from disallowed "
500 "sender %s discarded", fromHostFQDN);
501 }
502 ABORT_FINALIZE(RS_RET_HOST_NOT_PERMITTED);
503 }
504
505 /* OK, we have an allowed sender, so let's continue, what
506 * means we can finally fill in the session object.
507 */
508 CHKiRet(tcps_sess.SetHost(pSess, fromHostFQDN));
509 fromHostFQDN = NULL; /* we handed this string over */
510 CHKiRet(tcps_sess.SetHostIP(pSess, fromHostIP));
511 CHKiRet(tcps_sess.SetStrm(pSess, pNewStrm));
512 pNewStrm = NULL; /* prevent it from being freed in error handler, now done in tcps_sess! */
513 CHKiRet(tcps_sess.SetMsgIdx(pSess, 0));
514 CHKiRet(tcps_sess.ConstructFinalize(pSess));
515
516 /* check if we need to call our callback */
517 if(pThis->pOnSessAccept != NULL) {
518 CHKiRet(pThis->pOnSessAccept(pThis, pSess));
519 }
520
521 *ppSess = pSess;
522 if(!pThis->bUsingEPoll)
523 pThis->pSessions[iSess] = pSess;
524 pSess = NULL; /* this is now also handed over */
525
526 finalize_it:
527 if(iRet != RS_RET_OK) {
528 if(pSess != NULL)
529 tcps_sess.Destruct(&pSess);
530 if(pNewStrm != NULL)
531 netstrm.Destruct(&pNewStrm);
532 free(fromHostFQDN);
533 }
534
535 RETiRet;
536 }
537
538
539 static void
RunCancelCleanup(void * arg)540 RunCancelCleanup(void *arg)
541 {
542 nspoll_t **ppPoll = (nspoll_t**) arg;
543
544 if (*ppPoll != NULL)
545 nspoll.Destruct(ppPoll);
546
547 /* Wait for any running workers to finish */
548 pthread_mutex_lock(&wrkrMut);
549 DBGPRINTF("tcpsrv terminating, waiting for %d workers\n", wrkrRunning);
550 while(wrkrRunning > 0) {
551 pthread_cond_wait(&wrkrIdle, &wrkrMut);
552 }
553 pthread_mutex_unlock(&wrkrMut);
554 }
555
556 static void
RunSelectCancelCleanup(void * arg)557 RunSelectCancelCleanup(void *arg)
558 {
559 nssel_t **ppSel = (nssel_t**) arg;
560
561 if(*ppSel != NULL)
562 nssel.Destruct(ppSel);
563 }
564
565
566 /* helper to close a session. Takes status of poll vs. select into consideration.
567 * rgerhards, 2009-11-25
568 */
569 static rsRetVal
closeSess(tcpsrv_t * pThis,tcps_sess_t ** ppSess,nspoll_t * pPoll)570 closeSess(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll) {
571 DEFiRet;
572 if(pPoll != NULL) {
573 CHKiRet(nspoll.Ctl(pPoll, (*ppSess)->pStrm, 0, *ppSess, NSDPOLL_IN, NSDPOLL_DEL));
574 }
575 pThis->pOnRegularClose(*ppSess);
576 tcps_sess.Destruct(ppSess);
577 finalize_it:
578 RETiRet;
579 }
580
581
582 /* process a receive request on one of the streams
583 * If pPoll is non-NULL, we have a netstream in epoll mode, which means we need
584 * to remove any descriptor we close from the epoll set.
585 * rgerhards, 2009-07-020
586 */
587 static rsRetVal
doReceive(tcpsrv_t * pThis,tcps_sess_t ** ppSess,nspoll_t * pPoll)588 doReceive(tcpsrv_t *pThis, tcps_sess_t **ppSess, nspoll_t *pPoll)
589 {
590 char buf[128*1024]; /* reception buffer - may hold a partial or multiple messages */
591 ssize_t iRcvd;
592 rsRetVal localRet;
593 DEFiRet;
594 uchar *pszPeer;
595 int lenPeer;
596 int oserr = 0;
597
598 ISOBJ_TYPE_assert(pThis, tcpsrv);
599 DBGPRINTF("netstream %p with new data\n", (*ppSess)->pStrm);
600 /* Receive message */
601 iRet = pThis->pRcvData(*ppSess, buf, sizeof(buf), &iRcvd, &oserr);
602 switch(iRet) {
603 case RS_RET_CLOSED:
604 if(pThis->bEmitMsgOnClose) {
605 errno = 0;
606 prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
607 LogError(0, RS_RET_PEER_CLOSED_CONN, "Netstream session %p closed by remote "
608 "peer %s.\n", (*ppSess)->pStrm, pszPeer);
609 }
610 CHKiRet(closeSess(pThis, ppSess, pPoll));
611 break;
612 case RS_RET_RETRY:
613 /* we simply ignore retry - this is not an error, but we also have not received anything */
614 break;
615 case RS_RET_OK:
616 /* valid data received, process it! */
617 localRet = tcps_sess.DataRcvd(*ppSess, buf, iRcvd);
618 if(localRet != RS_RET_OK && localRet != RS_RET_QUEUE_FULL) {
619 /* in this case, something went awfully wrong.
620 * We are instructed to terminate the session.
621 */
622 prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
623 LogError(oserr, localRet, "Tearing down TCP Session from %s", pszPeer);
624 CHKiRet(closeSess(pThis, ppSess, pPoll));
625 }
626 break;
627 default:
628 prop.GetString((*ppSess)->fromHostIP, &pszPeer, &lenPeer);
629 LogError(oserr, iRet, "netstream session %p from %s will be closed due to error",
630 (*ppSess)->pStrm, pszPeer);
631 CHKiRet(closeSess(pThis, ppSess, pPoll));
632 break;
633 }
634
635 finalize_it:
636 RETiRet;
637 }
638
639 /* process a single workset item
640 */
641 static rsRetVal ATTR_NONNULL(1)
processWorksetItem(tcpsrv_t * const pThis,nspoll_t * pPoll,const int idx,void * pUsr)642 processWorksetItem(tcpsrv_t *const pThis, nspoll_t *pPoll, const int idx, void *pUsr)
643 {
644 tcps_sess_t *pNewSess = NULL;
645 tcpLstnParams_t *cnf_params;
646
647 DEFiRet;
648
649 DBGPRINTF("tcpsrv: processing item %d, pUsr %p, bAbortConn\n", idx, pUsr);
650 if(pUsr == pThis->ppLstn) {
651 DBGPRINTF("New connect on NSD %p.\n", pThis->ppLstn[idx]);
652 iRet = SessAccept(pThis, pThis->ppLstnPort[idx], &pNewSess, pThis->ppLstn[idx]);
653 cnf_params = pThis->ppLstnPort[idx]->cnf_params;
654 if(iRet == RS_RET_OK) {
655 if(pPoll != NULL) {
656 CHKiRet(nspoll.Ctl(pPoll, pNewSess->pStrm, 0, pNewSess, NSDPOLL_IN, NSDPOLL_ADD));
657 }
658 DBGPRINTF("New session created with NSD %p.\n", pNewSess);
659 } else {
660 DBGPRINTF("tcpsrv: error %d during accept\n", iRet);
661 }
662 } else {
663 pNewSess = (tcps_sess_t*) pUsr;
664 cnf_params = pNewSess->pLstnInfo->cnf_params;
665 doReceive(pThis, &pNewSess, pPoll);
666 if(pPoll == NULL && pNewSess == NULL) {
667 pThis->pSessions[idx] = NULL;
668 }
669 }
670
671 finalize_it:
672 if(iRet != RS_RET_OK) {
673 LogError(0, iRet, "tcpsrv listener (inputname: '%s') failed "
674 "to process incoming connection with error %d",
675 (cnf_params->pszInputName == NULL) ? (uchar*)"*UNSET*" : cnf_params->pszInputName, iRet);
676 srSleep(0,20000); /* Sleep 20ms */
677 }
678 RETiRet;
679 }
680
681
682 /* worker to process incoming requests
683 */
684 static void * ATTR_NONNULL(1)
wrkr(void * const myself)685 wrkr(void *const myself)
686 {
687 struct wrkrInfo_s *const me = (struct wrkrInfo_s*) myself;
688
689
690 pthread_mutex_lock(&wrkrMut);
691 while(1) {
692 // wait for work, in which case pSrv will be populated
693 while(me->pSrv == NULL && glbl.GetGlobalInputTermState() == 0) {
694 pthread_cond_wait(&me->run, &wrkrMut);
695 }
696 if(me->pSrv == NULL) {
697 // only possible if glbl.GetGlobalInputTermState() == 1
698 // we need to query me->opSrv to avoid clang static
699 // analyzer false positive! -- rgerhards, 2017-10-23
700 assert(glbl.GetGlobalInputTermState() == 1);
701 break;
702 }
703 pthread_mutex_unlock(&wrkrMut);
704
705 ++me->numCalled;
706 processWorksetItem(me->pSrv, me->pPoll, me->idx, me->pUsr);
707
708 pthread_mutex_lock(&wrkrMut);
709 me->pSrv = NULL; /* indicate we are free again */
710 --wrkrRunning;
711 pthread_cond_broadcast(&wrkrIdle);
712 }
713 me->enabled = 0; /* indicate we are no longer available */
714 pthread_mutex_unlock(&wrkrMut);
715
716 return NULL;
717 }
718
719 /* This has been factored out from processWorkset() because
720 * pthread_cleanup_push() invokes setjmp() and this triggers the -Wclobbered
721 * warning for the iRet variable.
722 */
723 static void
waitForWorkers(void)724 waitForWorkers(void)
725 {
726 pthread_mutex_lock(&wrkrMut);
727 pthread_cleanup_push(mutexCancelCleanup, &wrkrMut);
728 while(wrkrRunning > 0) {
729 pthread_cond_wait(&wrkrIdle, &wrkrMut);
730 }
731 pthread_cleanup_pop(1);
732 }
733
734 /* Process a workset, that is handle io. We become activated
735 * from either select or epoll handler. We split the workload
736 * out to a pool of threads, but try to avoid context switches
737 * as much as possible.
738 */
739 static rsRetVal
processWorkset(tcpsrv_t * pThis,nspoll_t * pPoll,int numEntries,nsd_epworkset_t workset[])740 processWorkset(tcpsrv_t *pThis, nspoll_t *pPoll, int numEntries, nsd_epworkset_t workset[])
741 {
742 int i;
743 int origEntries = numEntries;
744 DEFiRet;
745
746 DBGPRINTF("tcpsrv: ready to process %d event entries\n", numEntries);
747
748 while(numEntries > 0) {
749 if(glbl.GetGlobalInputTermState() == 1)
750 ABORT_FINALIZE(RS_RET_FORCE_TERM);
751 if(numEntries == 1) {
752 /* process self, save context switch */
753 iRet = processWorksetItem(pThis, pPoll, workset[numEntries-1].id, workset[numEntries-1].pUsr);
754 } else {
755 /* No cancel handler needed here, since no cancellation
756 * points are executed while wrkrMut is locked.
757 *
758 * Re-evaluate this if you add a DBGPRINTF or something!
759 */
760 pthread_mutex_lock(&wrkrMut);
761 /* check if there is a free worker */
762 for(i = 0 ; (i < wrkrMax) && ((wrkrInfo[i].pSrv != NULL) || (wrkrInfo[i].enabled == 0)) ; ++i)
763 /*do search*/;
764 if(i < wrkrMax) {
765 /* worker free -> use it! */
766 wrkrInfo[i].pSrv = pThis;
767 wrkrInfo[i].pPoll = pPoll;
768 wrkrInfo[i].idx = workset[numEntries -1].id;
769 wrkrInfo[i].pUsr = workset[numEntries -1].pUsr;
770 /* Note: we must increment wrkrRunning HERE and not inside the worker's
771 * code. This is because a worker may actually never start, and thus
772 * increment wrkrRunning, before we finish and check the running worker
773 * count. We can only avoid this by incrementing it here.
774 */
775 ++wrkrRunning;
776 pthread_cond_signal(&wrkrInfo[i].run);
777 pthread_mutex_unlock(&wrkrMut);
778 } else {
779 pthread_mutex_unlock(&wrkrMut);
780 /* no free worker, so we process this one ourselfs */
781 iRet = processWorksetItem(pThis, pPoll, workset[numEntries-1].id,
782 workset[numEntries-1].pUsr);
783 }
784 }
785 --numEntries;
786 }
787
788 if(origEntries > 1) {
789 /* we now need to wait until all workers finish. This is because the
790 * rest of this module can not handle the concurrency introduced
791 * by workers running during the epoll call.
792 */
793 waitForWorkers();
794 }
795
796 finalize_it:
797 RETiRet;
798 }
799
800
801 /* This function is called to gather input.
802 * This variant here is only used if we need to work with a netstream driver
803 * that does not support epoll().
804 */
805 PRAGMA_DIAGNOSTIC_PUSH
806 PRAGMA_IGNORE_Wempty_body
807 static rsRetVal
RunSelect(tcpsrv_t * pThis,nsd_epworkset_t workset[],size_t sizeWorkset)808 RunSelect(tcpsrv_t *pThis, nsd_epworkset_t workset[], size_t sizeWorkset)
809 {
810 DEFiRet;
811 int nfds;
812 int i;
813 int iWorkset;
814 int iTCPSess;
815 int bIsReady;
816 nssel_t *pSel = NULL;
817 rsRetVal localRet;
818
819 ISOBJ_TYPE_assert(pThis, tcpsrv);
820
821 pthread_cleanup_push(RunSelectCancelCleanup, (void*) &pSel);
822 while(1) {
823 CHKiRet(nssel.Construct(&pSel));
824 if(pThis->pszDrvrName != NULL)
825 CHKiRet(nssel.SetDrvrName(pSel, pThis->pszDrvrName));
826 CHKiRet(nssel.ConstructFinalize(pSel));
827
828 /* Add the TCP listen sockets to the list of read descriptors. */
829 for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
830 CHKiRet(nssel.Add(pSel, pThis->ppLstn[i], NSDSEL_RD));
831 }
832
833 /* do the sessions */
834 iTCPSess = TCPSessGetNxtSess(pThis, -1);
835 while(iTCPSess != -1) {
836 /* TODO: access to pNsd is NOT really CLEAN, use method... */
837 CHKiRet(nssel.Add(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD));
838 /* now get next... */
839 iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
840 }
841
842 /* wait for io to become ready */
843 CHKiRet(nssel.Wait(pSel, &nfds));
844 if(glbl.GetGlobalInputTermState() == 1)
845 break; /* terminate input! */
846
847 iWorkset = 0;
848 for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
849 if(glbl.GetGlobalInputTermState() == 1)
850 ABORT_FINALIZE(RS_RET_FORCE_TERM);
851 CHKiRet(nssel.IsReady(pSel, pThis->ppLstn[i], NSDSEL_RD, &bIsReady, &nfds));
852 if(bIsReady) {
853 workset[iWorkset].id = i;
854 workset[iWorkset].pUsr = (void*) pThis->ppLstn;
855 /* this is a flag to indicate listen sock */
856 ++iWorkset;
857 if(iWorkset >= (int) sizeWorkset) {
858 processWorkset(pThis, NULL, iWorkset, workset);
859 iWorkset = 0;
860 }
861 --nfds; /* indicate we have processed one */
862 }
863 }
864
865 /* now check the sessions */
866 iTCPSess = TCPSessGetNxtSess(pThis, -1);
867 while(nfds && iTCPSess != -1) {
868 if(glbl.GetGlobalInputTermState() == 1)
869 ABORT_FINALIZE(RS_RET_FORCE_TERM);
870 localRet = nssel.IsReady(pSel, pThis->pSessions[iTCPSess]->pStrm, NSDSEL_RD,
871 &bIsReady, &nfds);
872 if(bIsReady || localRet != RS_RET_OK) {
873 workset[iWorkset].id = iTCPSess;
874 workset[iWorkset].pUsr = (void*) pThis->pSessions[iTCPSess];
875 ++iWorkset;
876 if(iWorkset >= (int) sizeWorkset) {
877 processWorkset(pThis, NULL, iWorkset, workset);
878 iWorkset = 0;
879 }
880 --nfds; /* indicate we have processed one */
881 }
882 iTCPSess = TCPSessGetNxtSess(pThis, iTCPSess);
883 }
884
885 if(iWorkset > 0)
886 processWorkset(pThis, NULL, iWorkset, workset);
887
888 /* we need to copy back close descriptors */
889 nssel.Destruct(&pSel); /* no iRet check as it is overriden at start of loop! */
890 finalize_it: /* this is a very special case - this time only we do not exit the function,
891 * because that would not help us either. So we simply retry it. Let's see
892 * if that actually is a better idea. Exiting the loop wasn't we always
893 * crashed, which made sense (the rest of the engine was not prepared for
894 * that) -- rgerhards, 2008-05-19
895 */
896 if(pSel != NULL) { /* cleanup missing? happens during err exit! */
897 nssel.Destruct(&pSel);
898 }
899 }
900
901 pthread_cleanup_pop(1); /* execute and remove cleanup handler */
902
903 RETiRet;
904 }
905 PRAGMA_DIAGNOSTIC_POP
906
907 static rsRetVal
DoRun(tcpsrv_t * pThis,nspoll_t ** ppPoll)908 DoRun(tcpsrv_t *pThis, nspoll_t **ppPoll)
909 {
910 DEFiRet;
911 int i;
912 nsd_epworkset_t workset[128]; /* 128 is currently fixed num of concurrent requests */
913 int numEntries;
914 nspoll_t *pPoll = NULL;
915 rsRetVal localRet;
916
917 if((localRet = nspoll.Construct(ppPoll)) == RS_RET_OK) {
918 pPoll = *ppPoll;
919 if(pThis->pszDrvrName != NULL)
920 CHKiRet(nspoll.SetDrvrName(pPoll, pThis->pszDrvrName));
921 localRet = nspoll.ConstructFinalize(pPoll);
922 }
923 if(localRet != RS_RET_OK) {
924 /* fall back to select */
925 DBGPRINTF("tcpsrv could not use epoll() interface, iRet=%d, using select()\n", localRet);
926 iRet = RunSelect(pThis, workset, sizeof(workset)/sizeof(nsd_epworkset_t));
927 FINALIZE;
928 }
929
930 DBGPRINTF("tcpsrv uses epoll() interface, nsdpoll driver found\n");
931
932 /* flag that we are in epoll mode */
933 pThis->bUsingEPoll = RSTRUE;
934
935 /* Add the TCP listen sockets to the list of sockets to monitor */
936 for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
937 DBGPRINTF("Trying to add listener %d, pUsr=%p\n", i, pThis->ppLstn);
938 CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_ADD));
939 DBGPRINTF("Added listener %d\n", i);
940 }
941
942 while(glbl.GetGlobalInputTermState() == 0) {
943 numEntries = sizeof(workset)/sizeof(nsd_epworkset_t);
944 localRet = nspoll.Wait(pPoll, -1, &numEntries, workset);
945 if(glbl.GetGlobalInputTermState() == 1)
946 break; /* terminate input! */
947
948 /* check if we need to ignore the i/o ready state. We do this if we got an invalid
949 * return state. Validly, this can happen for RS_RET_EINTR, for other cases it may
950 * not be the right thing, but what is the right thing is really hard at this point...
951 */
952 if(localRet != RS_RET_OK)
953 continue;
954
955 processWorkset(pThis, pPoll, numEntries, workset);
956 }
957
958 /* remove the tcp listen sockets from the epoll set */
959 for(i = 0 ; i < pThis->iLstnCurr ; ++i) {
960 CHKiRet(nspoll.Ctl(pPoll, pThis->ppLstn[i], i, pThis->ppLstn, NSDPOLL_IN, NSDPOLL_DEL));
961 }
962
963 finalize_it:
964 RETiRet;
965 }
966
967
968 /* This function is called to gather input. It tries doing that via the epoll()
969 * interface. If the driver does not support that, it falls back to calling its
970 * select() equivalent.
971 * rgerhards, 2009-11-18
972 */
973 static rsRetVal
Run(tcpsrv_t * pThis)974 Run(tcpsrv_t *pThis)
975 {
976 DEFiRet;
977 nspoll_t *pPoll = NULL;
978
979 ISOBJ_TYPE_assert(pThis, tcpsrv);
980
981 if(pThis->iLstnCurr == 0) {
982 dbgprintf("tcpsrv: no listeneres at all (probably init error), terminating\n");
983 RETiRet; /* somewhat "dirty" exit to avoid issue with cancel handler */
984 }
985
986 /* check if we need to start the worker pool. Once it is running, all is
987 * well. Shutdown is done on modExit.
988 */
989 d_pthread_mutex_lock(&wrkrMut);
990 if(!bWrkrRunning) {
991 bWrkrRunning = 1;
992 startWorkerPool();
993 }
994 d_pthread_mutex_unlock(&wrkrMut);
995
996 /* We try to terminate cleanly, but install a cancellation clean-up
997 * handler in case we are cancelled.
998 */
999 pthread_cleanup_push(RunCancelCleanup, (void*) &pPoll);
1000 iRet = DoRun(pThis, &pPoll);
1001 pthread_cleanup_pop(1);
1002
1003 RETiRet;
1004 }
1005
1006
1007 /* Standard-Constructor */
1008 BEGINobjConstruct(tcpsrv) /* be sure to specify the object type also in END macro! */
1009 pThis->iSessMax = TCPSESS_MAX_DEFAULT;
1010 pThis->iLstnMax = TCPLSTN_MAX_DEFAULT;
1011 pThis->addtlFrameDelim = TCPSRV_NO_ADDTL_DELIMITER;
1012 pThis->maxFrameSize = 200000;
1013 pThis->bDisableLFDelim = 0;
1014 pThis->discardTruncatedMsg = 0;
1015 pThis->OnMsgReceive = NULL;
1016 pThis->dfltTZ[0] = '\0';
1017 pThis->bSPFramingFix = 0;
1018 pThis->ratelimitInterval = 0;
1019 pThis->ratelimitBurst = 10000;
1020 pThis->bUseFlowControl = 1;
1021 pThis->pszDrvrName = NULL;
1022 pThis->bPreserveCase = 1; /* preserve case in fromhost; default to true. */
1023 pThis->DrvrTlsVerifyDepth = 0;
ENDobjConstruct(tcpsrv)1024 ENDobjConstruct(tcpsrv)
1025
1026
1027 /* ConstructionFinalizer */
1028 static rsRetVal
1029 tcpsrvConstructFinalize(tcpsrv_t *pThis)
1030 {
1031 DEFiRet;
1032 ISOBJ_TYPE_assert(pThis, tcpsrv);
1033
1034 /* prepare network stream subsystem */
1035 CHKiRet(netstrms.Construct(&pThis->pNS));
1036 if(pThis->pszDrvrName != NULL)
1037 CHKiRet(netstrms.SetDrvrName(pThis->pNS, pThis->pszDrvrName));
1038 CHKiRet(netstrms.SetDrvrMode(pThis->pNS, pThis->iDrvrMode));
1039 CHKiRet(netstrms.SetDrvrCheckExtendedKeyUsage(pThis->pNS, pThis->DrvrChkExtendedKeyUsage));
1040 CHKiRet(netstrms.SetDrvrPrioritizeSAN(pThis->pNS, pThis->DrvrPrioritizeSan));
1041 CHKiRet(netstrms.SetDrvrTlsVerifyDepth(pThis->pNS, pThis->DrvrTlsVerifyDepth));
1042 if(pThis->pszDrvrAuthMode != NULL)
1043 CHKiRet(netstrms.SetDrvrAuthMode(pThis->pNS, pThis->pszDrvrAuthMode));
1044 /* Call SetDrvrPermitExpiredCerts required
1045 * when param is NULL default handling for ExpiredCerts is set! */
1046 CHKiRet(netstrms.SetDrvrPermitExpiredCerts(pThis->pNS, pThis->pszDrvrPermitExpiredCerts));
1047 CHKiRet(netstrms.SetDrvrTlsCAFile(pThis->pNS, pThis->pszDrvrCAFile));
1048 CHKiRet(netstrms.SetDrvrTlsKeyFile(pThis->pNS, pThis->pszDrvrKeyFile));
1049 CHKiRet(netstrms.SetDrvrTlsCertFile(pThis->pNS, pThis->pszDrvrCertFile));
1050 if(pThis->pPermPeers != NULL)
1051 CHKiRet(netstrms.SetDrvrPermPeers(pThis->pNS, pThis->pPermPeers));
1052 if(pThis->gnutlsPriorityString != NULL)
1053 CHKiRet(netstrms.SetDrvrGnutlsPriorityString(pThis->pNS, pThis->gnutlsPriorityString));
1054 CHKiRet(netstrms.ConstructFinalize(pThis->pNS));
1055
1056 /* set up listeners */
1057 CHKmalloc(pThis->ppLstn = calloc(pThis->iLstnMax, sizeof(netstrm_t*)));
1058 CHKmalloc(pThis->ppLstnPort = calloc(pThis->iLstnMax, sizeof(tcpLstnPortList_t*)));
1059 iRet = pThis->OpenLstnSocks(pThis);
1060
1061 finalize_it:
1062 if(iRet != RS_RET_OK) {
1063 if(pThis->pNS != NULL)
1064 netstrms.Destruct(&pThis->pNS);
1065 LogError(0, iRet, "tcpsrv could not create listener (inputname: '%s')",
1066 (pThis->pszInputName == NULL) ? (uchar*)"*UNSET*" : pThis->pszInputName);
1067 }
1068 RETiRet;
1069 }
1070
1071
1072 /* destructor for the tcpsrv object */
1073 BEGINobjDestruct(tcpsrv) /* be sure to specify the object type also in END and CODESTART macros! */
1074 CODESTARTobjDestruct(tcpsrv)
1075 if(pThis->OnDestruct != NULL)
1076 pThis->OnDestruct(pThis->pUsr);
1077
1078 deinit_tcp_listener(pThis);
1079
1080 if(pThis->pNS != NULL)
1081 netstrms.Destruct(&pThis->pNS);
1082 free(pThis->pszDrvrName);
1083 free(pThis->pszDrvrAuthMode);
1084 free(pThis->pszDrvrPermitExpiredCerts);
1085 free(pThis->pszDrvrCAFile);
1086 free(pThis->pszDrvrKeyFile);
1087 free(pThis->pszDrvrCertFile);
1088 free(pThis->ppLstn);
1089 free(pThis->ppLstnPort);
1090 free(pThis->pszInputName);
1091 free(pThis->pszOrigin);
1092 ENDobjDestruct(tcpsrv)
1093
1094
1095 /* debugprint for the tcpsrv object */
BEGINobjDebugPrint(tcpsrv)1096 BEGINobjDebugPrint(tcpsrv) /* be sure to specify the object type also in END and CODESTART macros! */
1097 CODESTARTobjDebugPrint(tcpsrv)
1098 ENDobjDebugPrint(tcpsrv)
1099
1100 /* set functions */
1101 static rsRetVal
1102 SetCBIsPermittedHost(tcpsrv_t *pThis, int (*pCB)(struct sockaddr *addr, char *fromHostFQDN, void*, void*))
1103 {
1104 DEFiRet;
1105 pThis->pIsPermittedHost = pCB;
1106 RETiRet;
1107 }
1108
1109 static rsRetVal
SetCBRcvData(tcpsrv_t * pThis,rsRetVal (* pRcvData)(tcps_sess_t *,char *,size_t,ssize_t *,int *))1110 SetCBRcvData(tcpsrv_t *pThis, rsRetVal (*pRcvData)(tcps_sess_t*, char*, size_t, ssize_t*, int*))
1111 {
1112 DEFiRet;
1113 pThis->pRcvData = pRcvData;
1114 RETiRet;
1115 }
1116
1117 static rsRetVal
SetCBOnListenDeinit(tcpsrv_t * pThis,rsRetVal (* pCB)(void *))1118 SetCBOnListenDeinit(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
1119 {
1120 DEFiRet;
1121 pThis->pOnListenDeinit = pCB;
1122 RETiRet;
1123 }
1124
1125 static rsRetVal
SetCBOnSessAccept(tcpsrv_t * pThis,rsRetVal (* pCB)(tcpsrv_t *,tcps_sess_t *))1126 SetCBOnSessAccept(tcpsrv_t *pThis, rsRetVal (*pCB)(tcpsrv_t*, tcps_sess_t*))
1127 {
1128 DEFiRet;
1129 pThis->pOnSessAccept = pCB;
1130 RETiRet;
1131 }
1132
1133 static rsRetVal
SetCBOnDestruct(tcpsrv_t * pThis,rsRetVal (* pCB)(void *))1134 SetCBOnDestruct(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
1135 {
1136 DEFiRet;
1137 pThis->OnDestruct = pCB;
1138 RETiRet;
1139 }
1140
1141 static rsRetVal
SetCBOnSessConstructFinalize(tcpsrv_t * pThis,rsRetVal (* pCB)(void *))1142 SetCBOnSessConstructFinalize(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
1143 {
1144 DEFiRet;
1145 pThis->OnSessConstructFinalize = pCB;
1146 RETiRet;
1147 }
1148
1149 static rsRetVal
SetCBOnSessDestruct(tcpsrv_t * pThis,rsRetVal (* pCB)(void *))1150 SetCBOnSessDestruct(tcpsrv_t *pThis, rsRetVal (*pCB)(void*))
1151 {
1152 DEFiRet;
1153 pThis->pOnSessDestruct = pCB;
1154 RETiRet;
1155 }
1156
1157 static rsRetVal
SetCBOnRegularClose(tcpsrv_t * pThis,rsRetVal (* pCB)(tcps_sess_t *))1158 SetCBOnRegularClose(tcpsrv_t *pThis, rsRetVal (*pCB)(tcps_sess_t*))
1159 {
1160 DEFiRet;
1161 pThis->pOnRegularClose = pCB;
1162 RETiRet;
1163 }
1164
1165 static rsRetVal
SetCBOnErrClose(tcpsrv_t * pThis,rsRetVal (* pCB)(tcps_sess_t *))1166 SetCBOnErrClose(tcpsrv_t *pThis, rsRetVal (*pCB)(tcps_sess_t*))
1167 {
1168 DEFiRet;
1169 pThis->pOnErrClose = pCB;
1170 RETiRet;
1171 }
1172
1173 static rsRetVal
SetCBOpenLstnSocks(tcpsrv_t * pThis,rsRetVal (* pCB)(tcpsrv_t *))1174 SetCBOpenLstnSocks(tcpsrv_t *pThis, rsRetVal (*pCB)(tcpsrv_t*))
1175 {
1176 DEFiRet;
1177 pThis->OpenLstnSocks = pCB;
1178 RETiRet;
1179 }
1180
1181 static rsRetVal
SetUsrP(tcpsrv_t * pThis,void * pUsr)1182 SetUsrP(tcpsrv_t *pThis, void *pUsr)
1183 {
1184 DEFiRet;
1185 pThis->pUsr = pUsr;
1186 RETiRet;
1187 }
1188
1189 static rsRetVal
SetKeepAlive(tcpsrv_t * pThis,int iVal)1190 SetKeepAlive(tcpsrv_t *pThis, int iVal)
1191 {
1192 DEFiRet;
1193 DBGPRINTF("tcpsrv: keep-alive set to %d\n", iVal);
1194 pThis->bUseKeepAlive = iVal;
1195 RETiRet;
1196 }
1197
1198 static rsRetVal
SetKeepAliveIntvl(tcpsrv_t * pThis,int iVal)1199 SetKeepAliveIntvl(tcpsrv_t *pThis, int iVal)
1200 {
1201 DEFiRet;
1202 DBGPRINTF("tcpsrv: keep-alive interval set to %d\n", iVal);
1203 pThis->iKeepAliveIntvl = iVal;
1204 RETiRet;
1205 }
1206
1207 static rsRetVal
SetKeepAliveProbes(tcpsrv_t * pThis,int iVal)1208 SetKeepAliveProbes(tcpsrv_t *pThis, int iVal)
1209 {
1210 DEFiRet;
1211 DBGPRINTF("tcpsrv: keep-alive probes set to %d\n", iVal);
1212 pThis->iKeepAliveProbes = iVal;
1213 RETiRet;
1214 }
1215
1216 static rsRetVal
SetKeepAliveTime(tcpsrv_t * pThis,int iVal)1217 SetKeepAliveTime(tcpsrv_t *pThis, int iVal)
1218 {
1219 DEFiRet;
1220 DBGPRINTF("tcpsrv: keep-alive timeout set to %d\n", iVal);
1221 pThis->iKeepAliveTime = iVal;
1222 RETiRet;
1223 }
1224
1225 static rsRetVal
SetGnutlsPriorityString(tcpsrv_t * pThis,uchar * iVal)1226 SetGnutlsPriorityString(tcpsrv_t *pThis, uchar *iVal)
1227 {
1228 DEFiRet;
1229 DBGPRINTF("tcpsrv: gnutlsPriorityString set to %s\n",
1230 (iVal == NULL) ? "(null)" : (const char*) iVal);
1231 pThis->gnutlsPriorityString = iVal;
1232 RETiRet;
1233 }
1234
1235
1236 static rsRetVal
SetOnMsgReceive(tcpsrv_t * pThis,rsRetVal (* OnMsgReceive)(tcps_sess_t *,uchar *,int))1237 SetOnMsgReceive(tcpsrv_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar*, int))
1238 {
1239 DEFiRet;
1240 assert(OnMsgReceive != NULL);
1241 pThis->OnMsgReceive = OnMsgReceive;
1242 RETiRet;
1243 }
1244
1245
1246 /* set enable/disable standard LF frame delimiter (use with care!)
1247 * -- rgerhards, 2010-01-03
1248 */
1249 static rsRetVal
SetbDisableLFDelim(tcpsrv_t * pThis,int bVal)1250 SetbDisableLFDelim(tcpsrv_t *pThis, int bVal)
1251 {
1252 DEFiRet;
1253 ISOBJ_TYPE_assert(pThis, tcpsrv);
1254 pThis->bDisableLFDelim = bVal;
1255 RETiRet;
1256 }
1257
1258
1259 /* discard the truncated msg part
1260 * -- PascalWithopf, 2017-04-20
1261 */
1262 static rsRetVal
SetDiscardTruncatedMsg(tcpsrv_t * pThis,int discard)1263 SetDiscardTruncatedMsg(tcpsrv_t *pThis, int discard)
1264 {
1265 DEFiRet;
1266 ISOBJ_TYPE_assert(pThis, tcpsrv);
1267 pThis->discardTruncatedMsg = discard;
1268 RETiRet;
1269 }
1270
1271
1272 /* Set additional framing to use (if any) -- rgerhards, 2008-12-10 */
1273 static rsRetVal
SetAddtlFrameDelim(tcpsrv_t * pThis,int iDelim)1274 SetAddtlFrameDelim(tcpsrv_t *pThis, int iDelim)
1275 {
1276 DEFiRet;
1277 ISOBJ_TYPE_assert(pThis, tcpsrv);
1278 pThis->addtlFrameDelim = iDelim;
1279 RETiRet;
1280 }
1281
1282
1283 /* Set max frame size for octet counted -- PascalWithopf, 2017-04-20*/
1284 static rsRetVal
SetMaxFrameSize(tcpsrv_t * pThis,int maxFrameSize)1285 SetMaxFrameSize(tcpsrv_t *pThis, int maxFrameSize)
1286 {
1287 DEFiRet;
1288 ISOBJ_TYPE_assert(pThis, tcpsrv);
1289 pThis->maxFrameSize = maxFrameSize;
1290 RETiRet;
1291 }
1292
1293
1294 static rsRetVal
SetDfltTZ(tcpsrv_t * const pThis,uchar * const tz)1295 SetDfltTZ(tcpsrv_t *const pThis, uchar *const tz)
1296 {
1297 DEFiRet;
1298 ISOBJ_TYPE_assert(pThis, tcpsrv);
1299 strncpy((char*)pThis->dfltTZ, (char*)tz, sizeof(pThis->dfltTZ));
1300 pThis->dfltTZ[sizeof(pThis->dfltTZ)-1] = '\0';
1301 RETiRet;
1302 }
1303
1304
1305 static rsRetVal
SetbSPFramingFix(tcpsrv_t * pThis,const sbool val)1306 SetbSPFramingFix(tcpsrv_t *pThis, const sbool val)
1307 {
1308 DEFiRet;
1309 ISOBJ_TYPE_assert(pThis, tcpsrv);
1310 pThis->bSPFramingFix = val;
1311 RETiRet;
1312 }
1313
1314 static rsRetVal
SetOrigin(tcpsrv_t * pThis,uchar * origin)1315 SetOrigin(tcpsrv_t *pThis, uchar *origin)
1316 {
1317 DEFiRet;
1318 free(pThis->pszOrigin);
1319 pThis->pszOrigin = (origin == NULL) ? NULL : ustrdup(origin);
1320 RETiRet;
1321 }
1322
1323 /* Set the input name to use -- rgerhards, 2008-12-10 */
1324 static rsRetVal
SetInputName(tcpsrv_t * const pThis,tcpLstnParams_t * const cnf_params,const uchar * const name)1325 SetInputName(tcpsrv_t *const pThis,tcpLstnParams_t *const cnf_params, const uchar *const name)
1326 {
1327 DEFiRet;
1328 ISOBJ_TYPE_assert(pThis, tcpsrv);
1329 if(name == NULL)
1330 cnf_params->pszInputName = NULL;
1331 else
1332 CHKmalloc(cnf_params->pszInputName = ustrdup(name));
1333 free(pThis->pszInputName); // TODO: REMOVE ME
1334 pThis->pszInputName = ustrdup("imtcp"); // TODO: REMOVE ME
1335
1336 /* we need to create a property */
1337 CHKiRet(prop.Construct(&cnf_params->pInputName));
1338 CHKiRet(prop.SetString(cnf_params->pInputName, cnf_params->pszInputName, ustrlen(cnf_params->pszInputName)));
1339 CHKiRet(prop.ConstructFinalize(cnf_params->pInputName));
1340 finalize_it:
1341 RETiRet;
1342 }
1343
1344
1345 /* Set the linux-like ratelimiter settings */
1346 static rsRetVal
SetLinuxLikeRatelimiters(tcpsrv_t * pThis,unsigned int ratelimitInterval,unsigned int ratelimitBurst)1347 SetLinuxLikeRatelimiters(tcpsrv_t *pThis, unsigned int ratelimitInterval, unsigned int ratelimitBurst)
1348 {
1349 DEFiRet;
1350 pThis->ratelimitInterval = ratelimitInterval;
1351 pThis->ratelimitBurst = ratelimitBurst;
1352 RETiRet;
1353 }
1354
1355
1356 /* Set connection close notification */
1357 static rsRetVal
SetNotificationOnRemoteClose(tcpsrv_t * pThis,int bNewVal)1358 SetNotificationOnRemoteClose(tcpsrv_t *pThis, int bNewVal)
1359 {
1360 DEFiRet;
1361 pThis->bEmitMsgOnClose = bNewVal;
1362 RETiRet;
1363 }
1364
1365
1366 /* here follows a number of methods that shuffle authentication settings down
1367 * to the drivers. Drivers not supporting these settings may return an error
1368 * state.
1369 * -------------------------------------------------------------------------- */
1370
1371 /* set the driver mode -- rgerhards, 2008-04-30 */
1372 static rsRetVal
SetDrvrMode(tcpsrv_t * pThis,const int iMode)1373 SetDrvrMode(tcpsrv_t *pThis, const int iMode)
1374 {
1375 DEFiRet;
1376 ISOBJ_TYPE_assert(pThis, tcpsrv);
1377 pThis->iDrvrMode = iMode;
1378 RETiRet;
1379 }
1380
1381 static rsRetVal
SetDrvrName(tcpsrv_t * pThis,uchar * const name)1382 SetDrvrName(tcpsrv_t *pThis, uchar *const name)
1383 {
1384 DEFiRet;
1385 ISOBJ_TYPE_assert(pThis, tcpsrv);
1386 free(pThis->pszDrvrName);
1387 CHKmalloc(pThis->pszDrvrName = ustrdup(name));
1388 finalize_it:
1389 RETiRet;
1390 }
1391
1392 /* set the driver authentication mode -- rgerhards, 2008-05-19 */
1393 static rsRetVal
SetDrvrAuthMode(tcpsrv_t * pThis,uchar * const mode)1394 SetDrvrAuthMode(tcpsrv_t *pThis, uchar *const mode)
1395 {
1396 DEFiRet;
1397 ISOBJ_TYPE_assert(pThis, tcpsrv);
1398 CHKmalloc(pThis->pszDrvrAuthMode = ustrdup(mode));
1399 finalize_it:
1400 RETiRet;
1401 }
1402
1403 /* set the driver permitexpiredcerts mode -- alorbach, 2018-12-20
1404 */
1405 static rsRetVal
SetDrvrPermitExpiredCerts(tcpsrv_t * pThis,uchar * mode)1406 SetDrvrPermitExpiredCerts(tcpsrv_t *pThis, uchar *mode)
1407 {
1408 DEFiRet;
1409 ISOBJ_TYPE_assert(pThis, tcpsrv);
1410 if (mode != NULL) {
1411 CHKmalloc(pThis->pszDrvrPermitExpiredCerts = ustrdup(mode));
1412 }
1413 finalize_it:
1414 RETiRet;
1415 }
1416
1417 static rsRetVal
SetDrvrCAFile(tcpsrv_t * const pThis,uchar * const mode)1418 SetDrvrCAFile(tcpsrv_t *const pThis, uchar *const mode)
1419 {
1420 DEFiRet;
1421 ISOBJ_TYPE_assert(pThis, tcpsrv);
1422 if (mode != NULL) {
1423 CHKmalloc(pThis->pszDrvrCAFile = ustrdup(mode));
1424 }
1425 finalize_it:
1426 RETiRet;
1427 }
1428
1429 static rsRetVal
SetDrvrKeyFile(tcpsrv_t * pThis,uchar * mode)1430 SetDrvrKeyFile(tcpsrv_t *pThis, uchar *mode)
1431 {
1432 DEFiRet;
1433 ISOBJ_TYPE_assert(pThis, tcpsrv);
1434 if (mode != NULL) {
1435 CHKmalloc(pThis->pszDrvrKeyFile = ustrdup(mode));
1436 }
1437 finalize_it:
1438 RETiRet;
1439 }
1440
1441 static rsRetVal
SetDrvrCertFile(tcpsrv_t * pThis,uchar * mode)1442 SetDrvrCertFile(tcpsrv_t *pThis, uchar *mode)
1443 {
1444 DEFiRet;
1445 ISOBJ_TYPE_assert(pThis, tcpsrv);
1446 if (mode != NULL) {
1447 CHKmalloc(pThis->pszDrvrCertFile = ustrdup(mode));
1448 }
1449 finalize_it:
1450 RETiRet;
1451 }
1452
1453
1454 /* set the driver's permitted peers -- rgerhards, 2008-05-19 */
1455 static rsRetVal
SetDrvrPermPeers(tcpsrv_t * pThis,permittedPeers_t * pPermPeers)1456 SetDrvrPermPeers(tcpsrv_t *pThis, permittedPeers_t *pPermPeers)
1457 {
1458 DEFiRet;
1459 ISOBJ_TYPE_assert(pThis, tcpsrv);
1460 pThis->pPermPeers = pPermPeers;
1461 RETiRet;
1462 }
1463
1464 /* set the driver cert extended key usage check setting -- jvymazal, 2019-08-16 */
1465 static rsRetVal
SetDrvrCheckExtendedKeyUsage(tcpsrv_t * pThis,int ChkExtendedKeyUsage)1466 SetDrvrCheckExtendedKeyUsage(tcpsrv_t *pThis, int ChkExtendedKeyUsage)
1467 {
1468 DEFiRet;
1469 ISOBJ_TYPE_assert(pThis, tcpsrv);
1470 pThis->DrvrChkExtendedKeyUsage = ChkExtendedKeyUsage;
1471 RETiRet;
1472 }
1473
1474 /* set the driver name checking policy -- jvymazal, 2019-08-16 */
1475 static rsRetVal
SetDrvrPrioritizeSAN(tcpsrv_t * pThis,int prioritizeSan)1476 SetDrvrPrioritizeSAN(tcpsrv_t *pThis, int prioritizeSan)
1477 {
1478 DEFiRet;
1479 ISOBJ_TYPE_assert(pThis, tcpsrv);
1480 pThis->DrvrPrioritizeSan = prioritizeSan;
1481 RETiRet;
1482 }
1483
1484 /* set the driver Set the driver tls verifyDepth -- alorbach, 2019-12-20 */
1485 static rsRetVal
SetDrvrTlsVerifyDepth(tcpsrv_t * pThis,int verifyDepth)1486 SetDrvrTlsVerifyDepth(tcpsrv_t *pThis, int verifyDepth)
1487 {
1488 DEFiRet;
1489 ISOBJ_TYPE_assert(pThis, tcpsrv);
1490 pThis->DrvrTlsVerifyDepth = verifyDepth;
1491 RETiRet;
1492 }
1493
1494 /* End of methods to shuffle autentication settings to the driver.;
1495
1496 * -------------------------------------------------------------------------- */
1497
1498
1499 /* set max number of listeners
1500 * this must be called before ConstructFinalize, or it will have no effect!
1501 * rgerhards, 2009-08-17
1502 */
1503 static rsRetVal
SetLstnMax(tcpsrv_t * pThis,int iMax)1504 SetLstnMax(tcpsrv_t *pThis, int iMax)
1505 {
1506 DEFiRet;
1507 ISOBJ_TYPE_assert(pThis, tcpsrv);
1508 pThis->iLstnMax = iMax;
1509 RETiRet;
1510 }
1511
1512
1513 /* set if flow control shall be supported
1514 */
1515 static rsRetVal
SetUseFlowControl(tcpsrv_t * pThis,int bUseFlowControl)1516 SetUseFlowControl(tcpsrv_t *pThis, int bUseFlowControl)
1517 {
1518 DEFiRet;
1519 ISOBJ_TYPE_assert(pThis, tcpsrv);
1520 pThis->bUseFlowControl = bUseFlowControl;
1521 RETiRet;
1522 }
1523
1524
1525 /* set max number of sessions
1526 * this must be called before ConstructFinalize, or it will have no effect!
1527 * rgerhards, 2009-04-09
1528 */
1529 static rsRetVal
SetSessMax(tcpsrv_t * pThis,int iMax)1530 SetSessMax(tcpsrv_t *pThis, int iMax)
1531 {
1532 DEFiRet;
1533 ISOBJ_TYPE_assert(pThis, tcpsrv);
1534 pThis->iSessMax = iMax;
1535 RETiRet;
1536 }
1537
1538
1539 static rsRetVal
SetPreserveCase(tcpsrv_t * pThis,int bPreserveCase)1540 SetPreserveCase(tcpsrv_t *pThis, int bPreserveCase)
1541 {
1542 DEFiRet;
1543 ISOBJ_TYPE_assert(pThis, tcpsrv);
1544 pThis-> bPreserveCase = bPreserveCase;
1545 RETiRet;
1546 }
1547
1548
1549 /* queryInterface function
1550 * rgerhards, 2008-02-29
1551 */
1552 BEGINobjQueryInterface(tcpsrv)
1553 CODESTARTobjQueryInterface(tcpsrv)
1554 if(pIf->ifVersion != tcpsrvCURR_IF_VERSION) { /* check for current version, increment on each change */
1555 ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
1556 }
1557
1558 /* ok, we have the right interface, so let's fill it
1559 * Please note that we may also do some backwards-compatibility
1560 * work here (if we can support an older interface version - that,
1561 * of course, also affects the "if" above).
1562 */
1563 pIf->DebugPrint = tcpsrvDebugPrint;
1564 pIf->Construct = tcpsrvConstruct;
1565 pIf->ConstructFinalize = tcpsrvConstructFinalize;
1566 pIf->Destruct = tcpsrvDestruct;
1567
1568 pIf->configureTCPListen = configureTCPListen;
1569 pIf->create_tcp_socket = create_tcp_socket;
1570 pIf->Run = Run;
1571
1572 pIf->SetKeepAlive = SetKeepAlive;
1573 pIf->SetKeepAliveIntvl = SetKeepAliveIntvl;
1574 pIf->SetKeepAliveProbes = SetKeepAliveProbes;
1575 pIf->SetKeepAliveTime = SetKeepAliveTime;
1576 pIf->SetGnutlsPriorityString = SetGnutlsPriorityString;
1577 pIf->SetUsrP = SetUsrP;
1578 pIf->SetInputName = SetInputName;
1579 pIf->SetOrigin = SetOrigin;
1580 pIf->SetDfltTZ = SetDfltTZ;
1581 pIf->SetbSPFramingFix = SetbSPFramingFix;
1582 pIf->SetAddtlFrameDelim = SetAddtlFrameDelim;
1583 pIf->SetMaxFrameSize = SetMaxFrameSize;
1584 pIf->SetbDisableLFDelim = SetbDisableLFDelim;
1585 pIf->SetDiscardTruncatedMsg = SetDiscardTruncatedMsg;
1586 pIf->SetSessMax = SetSessMax;
1587 pIf->SetUseFlowControl = SetUseFlowControl;
1588 pIf->SetLstnMax = SetLstnMax;
1589 pIf->SetDrvrMode = SetDrvrMode;
1590 pIf->SetDrvrAuthMode = SetDrvrAuthMode;
1591 pIf->SetDrvrPermitExpiredCerts = SetDrvrPermitExpiredCerts;
1592 pIf->SetDrvrCAFile = SetDrvrCAFile;
1593 pIf->SetDrvrKeyFile = SetDrvrKeyFile;
1594 pIf->SetDrvrCertFile = SetDrvrCertFile;
1595 pIf->SetDrvrName = SetDrvrName;
1596 pIf->SetDrvrPermPeers = SetDrvrPermPeers;
1597 pIf->SetCBIsPermittedHost = SetCBIsPermittedHost;
1598 pIf->SetCBOpenLstnSocks = SetCBOpenLstnSocks;
1599 pIf->SetCBRcvData = SetCBRcvData;
1600 pIf->SetCBOnListenDeinit = SetCBOnListenDeinit;
1601 pIf->SetCBOnSessAccept = SetCBOnSessAccept;
1602 pIf->SetCBOnSessConstructFinalize = SetCBOnSessConstructFinalize;
1603 pIf->SetCBOnSessDestruct = SetCBOnSessDestruct;
1604 pIf->SetCBOnDestruct = SetCBOnDestruct;
1605 pIf->SetCBOnRegularClose = SetCBOnRegularClose;
1606 pIf->SetCBOnErrClose = SetCBOnErrClose;
1607 pIf->SetOnMsgReceive = SetOnMsgReceive;
1608 pIf->SetLinuxLikeRatelimiters = SetLinuxLikeRatelimiters;
1609 pIf->SetNotificationOnRemoteClose = SetNotificationOnRemoteClose;
1610 pIf->SetPreserveCase = SetPreserveCase;
1611 pIf->SetDrvrCheckExtendedKeyUsage = SetDrvrCheckExtendedKeyUsage;
1612 pIf->SetDrvrPrioritizeSAN = SetDrvrPrioritizeSAN;
1613 pIf->SetDrvrTlsVerifyDepth = SetDrvrTlsVerifyDepth;
1614
1615 finalize_it:
1616 ENDobjQueryInterface(tcpsrv)
1617
1618
1619 /* exit our class
1620 * rgerhards, 2008-03-10
1621 */
1622 BEGINObjClassExit(tcpsrv, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
1623 CODESTARTObjClassExit(tcpsrv)
1624 /* release objects we no longer need */
1625 objRelease(tcps_sess, DONT_LOAD_LIB);
1626 objRelease(conf, CORE_COMPONENT);
1627 objRelease(prop, CORE_COMPONENT);
1628 objRelease(statsobj, CORE_COMPONENT);
1629 objRelease(ruleset, CORE_COMPONENT);
1630 objRelease(glbl, CORE_COMPONENT);
1631 objRelease(netstrms, DONT_LOAD_LIB);
1632 objRelease(nssel, DONT_LOAD_LIB);
1633 objRelease(netstrm, LM_NETSTRMS_FILENAME);
1634 objRelease(net, LM_NET_FILENAME);
1635 ENDObjClassExit(tcpsrv)
1636
1637
1638 /* Initialize our class. Must be called as the very first method
1639 * before anything else is called inside this class.
1640 * rgerhards, 2008-02-29
1641 */
1642 BEGINObjClassInit(tcpsrv, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE class also in END MACRO! */
1643 /* request objects we use */
1644 CHKiRet(objUse(net, LM_NET_FILENAME));
1645 CHKiRet(objUse(netstrms, LM_NETSTRMS_FILENAME));
1646 CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
1647 CHKiRet(objUse(nssel, DONT_LOAD_LIB));
1648 CHKiRet(objUse(nspoll, DONT_LOAD_LIB));
1649 CHKiRet(objUse(tcps_sess, DONT_LOAD_LIB));
1650 CHKiRet(objUse(conf, CORE_COMPONENT));
1651 CHKiRet(objUse(glbl, CORE_COMPONENT));
1652 CHKiRet(objUse(ruleset, CORE_COMPONENT));
1653 CHKiRet(objUse(statsobj, CORE_COMPONENT));
1654 CHKiRet(objUse(prop, CORE_COMPONENT));
1655
1656 /* set our own handlers */
1657 OBJSetMethodHandler(objMethod_DEBUGPRINT, tcpsrvDebugPrint);
1658 OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, tcpsrvConstructFinalize);
ENDObjClassInit(tcpsrv)1659 ENDObjClassInit(tcpsrv)
1660
1661
1662 /* start worker threads
1663 * Important: if we fork, this MUST be done AFTER forking
1664 */
1665 static void
1666 startWorkerPool(void)
1667 {
1668 int i;
1669 int r;
1670 pthread_attr_t sessThrdAttr;
1671
1672 /* We need to temporarily block all signals because the new thread
1673 * inherits our signal mask. There is a race if we do not block them
1674 * now, and we have seen in practice that this race causes grief.
1675 * So we 1. save the current set, 2. block evertyhing, 3. start
1676 * threads, and 4 reset the current set to saved state.
1677 * rgerhards, 2019-08-16
1678 */
1679 sigset_t sigSet, sigSetSave;
1680 sigfillset(&sigSet);
1681 pthread_sigmask(SIG_SETMASK, &sigSet, &sigSetSave);
1682
1683 wrkrRunning = 0;
1684 pthread_cond_init(&wrkrIdle, NULL);
1685 pthread_attr_init(&sessThrdAttr);
1686 pthread_attr_setstacksize(&sessThrdAttr, 4096*1024);
1687 for(i = 0 ; i < wrkrMax ; ++i) {
1688 /* init worker info structure! */
1689 pthread_cond_init(&wrkrInfo[i].run, NULL);
1690 wrkrInfo[i].pSrv = NULL;
1691 wrkrInfo[i].numCalled = 0;
1692 r = pthread_create(&wrkrInfo[i].tid, &sessThrdAttr, wrkr, &(wrkrInfo[i]));
1693 if(r == 0) {
1694 wrkrInfo[i].enabled = 1;
1695 } else {
1696 LogError(errno, NO_ERRCODE, "tcpsrv error creating thread");
1697 }
1698 }
1699 pthread_attr_destroy(&sessThrdAttr);
1700 pthread_sigmask(SIG_SETMASK, &sigSetSave, NULL);
1701 }
1702
1703 /* destroy worker pool structures and wait for workers to terminate
1704 */
1705 static void
stopWorkerPool(void)1706 stopWorkerPool(void)
1707 {
1708 int i;
1709 for(i = 0 ; i < wrkrMax ; ++i) {
1710 pthread_mutex_lock(&wrkrMut);
1711 pthread_cond_signal(&wrkrInfo[i].run); /* awake wrkr if not running */
1712 pthread_mutex_unlock(&wrkrMut);
1713 pthread_join(wrkrInfo[i].tid, NULL);
1714 DBGPRINTF("tcpsrv: info: worker %d was called %llu times\n", i, wrkrInfo[i].numCalled);
1715 pthread_cond_destroy(&wrkrInfo[i].run);
1716 }
1717 pthread_cond_destroy(&wrkrIdle);
1718 }
1719
1720
1721 /* --------------- here now comes the plumbing that makes as a library module --------------- */
1722
1723 BEGINmodExit
1724 CODESTARTmodExit
1725 if(bWrkrRunning) {
1726 stopWorkerPool();
1727 bWrkrRunning = 0;
1728 }
1729 /* de-init in reverse order! */
1730 tcpsrvClassExit();
1731 tcps_sessClassExit();
1732 pthread_mutex_destroy(&wrkrMut);
1733 ENDmodExit
1734
1735
1736 BEGINqueryEtryPt
1737 CODESTARTqueryEtryPt
1738 CODEqueryEtryPt_STD_LIB_QUERIES
1739 ENDqueryEtryPt
1740
1741
1742 BEGINmodInit()
1743 CODESTARTmodInit
1744 *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
1745 /* we just init the worker mutex, but do not start the workers themselves. This is deferred
1746 * to the first call of Run(). Reasons for this:
1747 * 1. depending on load order, tcpsrv gets loaded during rsyslog startup BEFORE
1748 * it forks, in which case the workers would be running in the then-killed parent,
1749 * leading to a defuncnt child (we actually had this bug).
1750 * 2. depending on circumstances, Run() would possibly never be called, in which case
1751 * the worker threads would be totally useless.
1752 * Note that in order to guarantee a non-racy worker start, we need to guard the
1753 * startup sequence by a mutex, which is why we init it here (no problem with fork()
1754 * in this case as the mutex is a pure-memory structure).
1755 * rgerhards, 2012-05-18
1756 */
1757 pthread_mutex_init(&wrkrMut, NULL);
1758 bWrkrRunning = 0;
1759
1760 /* Initialize all classes that are in our module - this includes ourselfs */
1761 CHKiRet(tcps_sessClassInit(pModInfo));
1762 CHKiRet(tcpsrvClassInit(pModInfo)); /* must be done after tcps_sess, as we use it */
1763 ENDmodInit
1764