1 /* Definition of the worker thread instance (wti) class.
2  *
3  * Copyright 2008-2017 Adiscon GmbH.
4  *
5  * This file is part of the rsyslog runtime library.
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *       -or-
13  *       see COPYING.ASL20 in the source distribution
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  */
21 
22 #ifndef WTI_H_INCLUDED
23 #define WTI_H_INCLUDED
24 
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include "wtp.h"
28 #include "obj.h"
29 #include "batch.h"
30 #include "action.h"
31 
32 
33 #define ACT_STATE_RDY  0	/* action ready, waiting for new transaction */
34 #define ACT_STATE_ITX  1	/* transaction active, waiting for new data or commit */
35 /* 2 currently not being used */
36 #define ACT_STATE_RTRY 3	/* failure occured, trying to restablish ready state */
37 #define ACT_STATE_SUSP 4	/* suspended due to failure (return fail until timeout expired) */
38 #define ACT_STATE_DATAFAIL 5	/* suspended due to failure in data, which means the message in
39 				   questions needs to be dropped as it will always fail. The
40 				   action must still do a "normal" retry in order to bring
41 				   it back to regular state. */
42 /* note: 3 bit bit field --> highest value is 7! */
43 
44 typedef struct actWrkrInfo {
45 	action_t *pAction;
46 	void *actWrkrData;
47 	uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an
48 				   immediate failure following */
49 	int	iNbrResRtry;	/* number of retries since last suspend */
50 	sbool	bHadAutoCommit;	/* did an auto-commit happen during doAction()? */
51 	struct {
52 		unsigned actState : 3;
53 	} flags;
54 	union {
55 		struct {
56 			actWrkrIParams_t *iparams;/* dynamically sized array for transactional outputs */
57 			int currIParam;
58 			int maxIParams;	/* current max */
59 		} tx;
60 		struct {
61 			actWrkrIParams_t actParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
62 		} nontx;
63 	} p; /* short name for "parameters" */
64 } actWrkrInfo_t;
65 
66 /* the worker thread instance class */
67 struct wti_s {
68 	BEGINobjInstance;
69 	pthread_t thrdID; 	/* thread ID */
70 	int bIsRunning;	/* is this thread currently running? (must be int for atomic op!) */
71 	sbool bAlwaysRunning;	/* should this thread always run? */
72 	int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
73 	wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
74 	batch_t batch; /* pointer to an object array meaningful for current user
75 			  pointer (e.g. queue pUsr data elemt) */
76 	uchar *pszDbgHdr;	/* header string for debug messages */
77 	actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions
78 				      (sized for max nbr of actions in config!) */
79 	pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
80 	DEF_ATOMIC_HELPER_MUT(mutIsRunning)
81 	struct {
82 		uint8_t	script_errno; /* errno-type interface for RainerScript functions */
83 		uint8_t bPrevWasSuspended;
84 		uint8_t bDoAutoCommit; /* do a commit after each message
85 		                        * this is usually set for batches with 0 element, but may
86 					* also be added as a user-selectable option (not implemented yet)
87 					*/
88 	} execState;	/* state for the execution engine */
89 };
90 
91 
92 /* prototypes */
93 rsRetVal wtiConstruct(wti_t **ppThis);
94 rsRetVal wtiConstructFinalize(wti_t * const pThis);
95 rsRetVal wtiDestruct(wti_t **ppThis);
96 rsRetVal wtiWorker(wti_t * const pThis);
97 rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg);
98 uchar * ATTR_NONNULL() wtiGetDbgHdr(const wti_t *const pThis);
99 rsRetVal wtiCancelThrd(wti_t * const pThis, const uchar *const cancelobj);
100 void ATTR_NONNULL() wtiJoinThrd(wti_t *const pThis);
101 rsRetVal wtiSetAlwaysRunning(wti_t * const pThis);
102 rsRetVal wtiSetState(wti_t * const pThis, int bNew);
103 rsRetVal wtiWakeupThrd(wti_t * const pThis);
104 int wtiGetState(wti_t * const pThis);
105 wti_t *wtiGetDummy(void);
106 int ATTR_NONNULL() wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout);
107 PROTOTYPEObjClassInit(wti);
108 PROTOTYPEObjClassExit(wti);
109 PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
110 PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
111 
112 #define getActionStateByNbr(pWti, iActNbr) ((uint8_t) ((pWti)->actWrkrInfo[(iActNbr)].flags.actState))
113 #define getActionState(pWti, pAction) (((uint8_t) (pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState))
114 #define setActionState(pWti, pAction, newState) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState = \
115 (newState))
116 #define getActionResumeInRow(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow))
117 #define setActionResumeInRow(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow = (val))
118 #define incActionResumeInRow(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow++)
119 #define getActionNbrResRtry(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry))
120 #define setActionNbrResRtry(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry = (val))
121 #define incActionNbrResRtry(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry++)
122 #define wtiInitIParam(piparams) (memset((piparams), 0, sizeof(actWrkrIParams_t)))
123 
124 #define wtiGetScriptErrno(pWti) ((pWti)->execState.script_errno)
125 #define wtiSetScriptErrno(pWti, newval) (pWti)->execState.script_errno = (newval)
126 
127 static inline uint8_t ATTR_UNUSED ATTR_NONNULL(1)
wtiGetPrevWasSuspended(const wti_t * const pWti)128 wtiGetPrevWasSuspended(const wti_t * const pWti)
129 {
130 	assert(pWti != NULL);
131 	return pWti->execState.bPrevWasSuspended;
132 }
133 
134 static inline void __attribute__((unused))
wtiResetExecState(wti_t * const pWti,batch_t * const pBatch)135 wtiResetExecState(wti_t * const pWti, batch_t * const pBatch)
136 {
137 	wtiSetScriptErrno(pWti, 0);
138 	pWti->execState.bPrevWasSuspended = 0;
139 	pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
140 }
141 
142 
143 rsRetVal wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams);
144 #endif /* #ifndef WTI_H_INCLUDED */
145