1 /* Definition of the worker thread instance (wti) class.
2 *
3 * Copyright 2008-2017 Adiscon GmbH.
4 *
5 * This file is part of the rsyslog runtime library.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 * -or-
13 * see COPYING.ASL20 in the source distribution
14 *
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 */
21
22 #ifndef WTI_H_INCLUDED
23 #define WTI_H_INCLUDED
24
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include "wtp.h"
28 #include "obj.h"
29 #include "batch.h"
30 #include "action.h"
31
32
33 #define ACT_STATE_RDY 0 /* action ready, waiting for new transaction */
34 #define ACT_STATE_ITX 1 /* transaction active, waiting for new data or commit */
35 /* 2 currently not being used */
36 #define ACT_STATE_RTRY 3 /* failure occured, trying to restablish ready state */
37 #define ACT_STATE_SUSP 4 /* suspended due to failure (return fail until timeout expired) */
38 #define ACT_STATE_DATAFAIL 5 /* suspended due to failure in data, which means the message in
39 questions needs to be dropped as it will always fail. The
40 action must still do a "normal" retry in order to bring
41 it back to regular state. */
42 /* note: 3 bit bit field --> highest value is 7! */
43
44 typedef struct actWrkrInfo {
45 action_t *pAction;
46 void *actWrkrData;
47 uint16_t uResumeOKinRow;/* number of times in a row that resume said OK with an
48 immediate failure following */
49 int iNbrResRtry; /* number of retries since last suspend */
50 sbool bHadAutoCommit; /* did an auto-commit happen during doAction()? */
51 struct {
52 unsigned actState : 3;
53 } flags;
54 union {
55 struct {
56 actWrkrIParams_t *iparams;/* dynamically sized array for transactional outputs */
57 int currIParam;
58 int maxIParams; /* current max */
59 } tx;
60 struct {
61 actWrkrIParams_t actParams[CONF_OMOD_NUMSTRINGS_MAXSIZE];
62 } nontx;
63 } p; /* short name for "parameters" */
64 } actWrkrInfo_t;
65
66 /* the worker thread instance class */
67 struct wti_s {
68 BEGINobjInstance;
69 pthread_t thrdID; /* thread ID */
70 int bIsRunning; /* is this thread currently running? (must be int for atomic op!) */
71 sbool bAlwaysRunning; /* should this thread always run? */
72 int *pbShutdownImmediate;/* end processing of this batch immediately if set to 1 */
73 wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
74 batch_t batch; /* pointer to an object array meaningful for current user
75 pointer (e.g. queue pUsr data elemt) */
76 uchar *pszDbgHdr; /* header string for debug messages */
77 actWrkrInfo_t *actWrkrInfo; /* *array* of action wrkr infos for all actions
78 (sized for max nbr of actions in config!) */
79 pthread_cond_t pcondBusy; /* condition to wake up the worker, protected by pmutUsr in wtp */
80 DEF_ATOMIC_HELPER_MUT(mutIsRunning)
81 struct {
82 uint8_t script_errno; /* errno-type interface for RainerScript functions */
83 uint8_t bPrevWasSuspended;
84 uint8_t bDoAutoCommit; /* do a commit after each message
85 * this is usually set for batches with 0 element, but may
86 * also be added as a user-selectable option (not implemented yet)
87 */
88 } execState; /* state for the execution engine */
89 };
90
91
92 /* prototypes */
93 rsRetVal wtiConstruct(wti_t **ppThis);
94 rsRetVal wtiConstructFinalize(wti_t * const pThis);
95 rsRetVal wtiDestruct(wti_t **ppThis);
96 rsRetVal wtiWorker(wti_t * const pThis);
97 rsRetVal wtiSetDbgHdr(wti_t * const pThis, uchar *pszMsg, size_t lenMsg);
98 uchar * ATTR_NONNULL() wtiGetDbgHdr(const wti_t *const pThis);
99 rsRetVal wtiCancelThrd(wti_t * const pThis, const uchar *const cancelobj);
100 void ATTR_NONNULL() wtiJoinThrd(wti_t *const pThis);
101 rsRetVal wtiSetAlwaysRunning(wti_t * const pThis);
102 rsRetVal wtiSetState(wti_t * const pThis, int bNew);
103 rsRetVal wtiWakeupThrd(wti_t * const pThis);
104 int wtiGetState(wti_t * const pThis);
105 wti_t *wtiGetDummy(void);
106 int ATTR_NONNULL() wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout);
107 PROTOTYPEObjClassInit(wti);
108 PROTOTYPEObjClassExit(wti);
109 PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
110 PROTOTYPEpropSetMeth(wti, pWtp, wtp_t*);
111
112 #define getActionStateByNbr(pWti, iActNbr) ((uint8_t) ((pWti)->actWrkrInfo[(iActNbr)].flags.actState))
113 #define getActionState(pWti, pAction) (((uint8_t) (pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState))
114 #define setActionState(pWti, pAction, newState) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].flags.actState = \
115 (newState))
116 #define getActionResumeInRow(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow))
117 #define setActionResumeInRow(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow = (val))
118 #define incActionResumeInRow(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].uResumeOKinRow++)
119 #define getActionNbrResRtry(pWti, pAction) (((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry))
120 #define setActionNbrResRtry(pWti, pAction, val) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry = (val))
121 #define incActionNbrResRtry(pWti, pAction) ((pWti)->actWrkrInfo[(pAction)->iActionNbr].iNbrResRtry++)
122 #define wtiInitIParam(piparams) (memset((piparams), 0, sizeof(actWrkrIParams_t)))
123
124 #define wtiGetScriptErrno(pWti) ((pWti)->execState.script_errno)
125 #define wtiSetScriptErrno(pWti, newval) (pWti)->execState.script_errno = (newval)
126
127 static inline uint8_t ATTR_UNUSED ATTR_NONNULL(1)
wtiGetPrevWasSuspended(const wti_t * const pWti)128 wtiGetPrevWasSuspended(const wti_t * const pWti)
129 {
130 assert(pWti != NULL);
131 return pWti->execState.bPrevWasSuspended;
132 }
133
134 static inline void __attribute__((unused))
wtiResetExecState(wti_t * const pWti,batch_t * const pBatch)135 wtiResetExecState(wti_t * const pWti, batch_t * const pBatch)
136 {
137 wtiSetScriptErrno(pWti, 0);
138 pWti->execState.bPrevWasSuspended = 0;
139 pWti->execState.bDoAutoCommit = (batchNumMsgs(pBatch) == 1);
140 }
141
142
143 rsRetVal wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams);
144 #endif /* #ifndef WTI_H_INCLUDED */
145