1 /* Definition of serial stream class (strm).
2  *
3  * A serial stream provides serial data access. In theory, serial streams
4  * can be implemented via a number of methods (e.g. files or in-memory
5  * streams). In practice, there currently only exist the file type (aka
6  * "driver").
7  *
8  * In practice, many stream features are bound to files. I have not yet made
9  * any serious effort, except for the naming of this class, to try to make
10  * the interfaces very generic. However, I assume that we could work much
11  * like in the strm class, where some properties are simply ignored when
12  * the wrong strm mode is selected (which would translate here to the wrong
13  * stream mode).
14  *
15  * Most importantly, this class provides generic input and output functions
16  * which can directly be used to work with the strms and file output. It
17  * provides such useful things like a circular file buffer and, hopefully
18  * at a later stage, a lazy writer. The object is also seriazable and thus
19  * can easily be persistet. The bottom line is that it makes much sense to
20  * use this class whereever possible as its features may grow in the future.
21  *
22  * An important note on writing gzip format via zlib (kept anonymous
23  * by request):
24  *
25  * --------------------------------------------------------------------------
26  * We'd like to make sure the output file is in full gzip format
27  * (compatible with gzip -d/zcat etc).  There is a flag in how the output
28  * is initialized within zlib to properly add the gzip wrappers to the
29  * output.  (gzip is effectively a small metadata wrapper around raw
30  * zstream output.)
31  *
32  * I had written an old bit of code to do this - the documentation on
33  * deflatInit2() was pretty tricky to nail down on this specific feature:
34  *
35  * int deflateInit2 (z_streamp strm, int level, int method, int windowBits,
36  * int memLevel, int strategy);
37  *
38  * I believe "31" would be the value for the "windowBits" field that you'd
39  * want to try:
40  *
41  * deflateInit2(zstrmptr, 6, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY);
42  * --------------------------------------------------------------------------
43  *
44  * Copyright 2008-2018 Rainer Gerhards and Adiscon GmbH.
45  *
46  * This file is part of the rsyslog runtime library.
47  *
48  * The rsyslog runtime library is free software: you can redistribute it and/or modify
49  * it under the terms of the GNU Lesser General Public License as published by
50  * the Free Software Foundation, either version 3 of the License, or
51  * (at your option) any later version.
52  *
53  * The rsyslog runtime library is distributed in the hope that it will be useful,
54  * but WITHOUT ANY WARRANTY; without even the implied warranty of
55  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
56  * GNU Lesser General Public License for more details.
57  *
58  * You should have received a copy of the GNU Lesser General Public License
59  * along with the rsyslog runtime library.  If not, see <http://www.gnu.org/licenses/>.
60  *
61  * A copy of the GPL can be found in the file "COPYING" in this distribution.
62  * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
63  */
64 
65 #ifndef STREAM_H_INCLUDED
66 #define STREAM_H_INCLUDED
67 
68 #include <regex.h> // TODO: fix via own module
69 #include <pthread.h>
70 #include <stdint.h>
71 #include <time.h>
72 #include "obj-types.h"
73 #include "glbl.h"
74 #include "stream.h"
75 #include "zlibw.h"
76 #include "cryprov.h"
77 
78 /* stream types */
79 typedef enum {
80 	STREAMTYPE_FILE_SINGLE = 0,	/**< read a single file */
81 	STREAMTYPE_FILE_CIRCULAR = 1,	/**< circular files */
82 	STREAMTYPE_FILE_MONITOR = 2,	/**< monitor a (third-party) file */
83 	STREAMTYPE_NAMED_PIPE = 3	/**< file is a named pipe (so far, tested for output only) */
84 } strmType_t;
85 
86 typedef enum {				/* when extending, do NOT change existing modes! */
87 	STREAMMMODE_INVALID = 0,
88 	STREAMMODE_READ = 1,
89 	STREAMMODE_WRITE = 2,
90 	STREAMMODE_WRITE_TRUNC = 3,
91 	STREAMMODE_WRITE_APPEND = 4
92 } strmMode_t;
93 
94 /* settings for stream rotation (applies not to all processing modes!) */
95 #define	STRM_ROTATION_DO_CHECK		0
96 #define	STRM_ROTATION_DO_NOT_CHECK	1
97 
98 #define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */
99 /* The strm_t data structure */
100 typedef struct strm_s {
101 	BEGINobjInstance;	/* Data to implement generic object - MUST be the first data element! */
102 	strmType_t sType;
103 	/* descriptive properties */
104 	unsigned int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */
105 	uchar *pszFName; /* prefix for generated filenames */
106 	int lenFName;
107 	strmMode_t tOperationsMode;
108 	mode_t tOpenMode;
109 	int64 iMaxFileSize;/* maximum size a file may grow to */
110 	unsigned int iMaxFiles;	/* maximum number of files if a circular mode is in use */
111 	int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
112 	sbool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
113 	int64 iCurrOffs;/* current offset */
114 	int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since
115 	the last CntrSet() */
116 	sbool bPrevWasNL; /* used for readLine() when reading multi-line messages */
117 	/* dynamic properties, valid only during file open, not to be persistet */
118 	sbool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */
119 	sbool bSync;	/* sync this file after every write? */
120 	sbool bReopenOnTruncate;
121 	int rotationCheck; /* rotation check mode */
122 	size_t sIOBufSize;/* size of IO buffer */
123 	uchar *pszDir; /* Directory */
124 	int lenDir;
125 	int fd;		/* the file descriptor, -1 if closed */
126 	int fdDir;	/* the directory's descriptor, in case bSync is requested (-1 if closed) */
127 	int readTimeout;/* 0: do not timeout */
128 	time_t lastRead;/* for timeout processing */
129 	ino_t inode;	/* current inode for files being monitored (undefined else) */
130 	uchar *pszCurrFName; /* name of current file (if open) */
131 	uchar *pIOBuf;	/* the iobuffer currently in use to gather data */
132 	char *pIOBuf_truncation; /* iobuffer used during trucation detection block re-reads */
133 	size_t iBufPtrMax;	/* current max Ptr in Buffer (if partial read!) */
134 	size_t iBufPtr;	/* pointer into current buffer */
135 	int iUngetC;	/* char set via UngetChar() call or -1 if none set */
136 	sbool bInRecord;	/* if 1, indicates that we are currently writing a not-yet complete record */
137 	int iZipLevel;	/* zip level (0..9). If 0, zip is completely disabled */
138 	Bytef *pZipBuf;
139 	/* support for async flush procesing */
140 	sbool bAsyncWrite;	/* do asynchronous writes (always if a flush interval is given) */
141 	sbool bStopWriter;	/* shall writer thread terminate? */
142 	sbool bDoTimedWait;	/* instruct writer thread to do a times wait to support flush timeouts */
143 	sbool bzInitDone;	/* did we do an init of zstrm already? */
144 	sbool bFlushNow;	/* shall we flush with the next async write? */
145 	sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */
146 	int iFlushInterval; /* flush in which interval - 0, no flushing */
147 	pthread_mutex_t mut;/* mutex for flush in async mode */
148 	pthread_cond_t notFull;
149 	pthread_cond_t notEmpty;
150 	pthread_cond_t isEmpty;
151 	unsigned short iEnq;	/* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
152 	unsigned short iDeq;	/* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */
153 	cryprov_if_t *cryprov;  /* ptr to crypto provider; NULL = do not encrypt */
154 	void	*cryprovData;	/* opaque data ptr for provider use */
155 	void 	*cryprovFileData;/* opaque data ptr for file instance */
156 	short iCnt;	/* current nbr of elements in buffer */
157 	z_stream zstrm;	/* zip stream to use */
158 	struct {
159 		uchar *pBuf;
160 		size_t lenBuf;
161 	} asyncBuf[STREAM_ASYNC_NUMBUFS];
162 	pthread_t writerThreadID;
163 	/* support for omfile size-limiting commands, special counters, NOT persisted! */
164 	off_t	iSizeLimit;	/* file size limit, 0 = no limit */
165 	uchar	*pszSizeLimitCmd;	/* command to carry out when size limit is reached */
166 	sbool	bIsTTY;		/* is this a tty file? */
167 	cstr_t *prevLineSegment; /* for ReadLine, previous, unprocessed part of file */
168 	cstr_t *prevMsgSegment; /* for ReadMultiLine, previous, yet unprocessed part of msg */
169 	int64 strtOffs;		/* start offset in file for current line/msg */
170 	int fileNotFoundError;	/* boolean; if set, report file not found errors, else silently ignore */
171 	int noRepeatedErrorOutput; /* if a file is missing the Error is only given once */
172 	int ignoringMsg;
173 } strm_t;
174 
175 
176 /* interfaces */
177 BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */
178 	rsRetVal (*Construct)(strm_t **ppThis);
179 	rsRetVal (*ConstructFinalize)(strm_t *pThis);
180 	rsRetVal (*Destruct)(strm_t **ppThis);
181 	rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName);
182 	rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC);
183 	rsRetVal (*UnreadChar)(strm_t *pThis, uchar c);
184 	rsRetVal (*SeekCurrOffs)(strm_t *pThis);
185 	rsRetVal (*Write)(strm_t *const pThis, const uchar *const pBuf, size_t lenBuf);
186 	rsRetVal (*WriteChar)(strm_t *pThis, uchar c);
187 	rsRetVal (*WriteLong)(strm_t *pThis, long i);
188 	rsRetVal (*SetFileNotFoundError)(strm_t *pThis, int pFileNotFoundError);
189 	rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix);
190 	rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir);
191 	rsRetVal (*Flush)(strm_t *pThis);
192 	rsRetVal (*RecordBegin)(strm_t *pThis);
193 	rsRetVal (*RecordEnd)(strm_t *pThis);
194 	rsRetVal (*Serialize)(strm_t *pThis, strm_t *pStrm);
195 	rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs);
196 	rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt);
197 	rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew);
198 	INTERFACEpropSetMeth(strm, bDeleteOnClose, int);
199 	INTERFACEpropSetMeth(strm, iMaxFileSize, int64);
200 	INTERFACEpropSetMeth(strm, iMaxFiles, int);
201 	INTERFACEpropSetMeth(strm, iFileNumDigits, int);
202 	INTERFACEpropSetMeth(strm, tOperationsMode, int);
203 	INTERFACEpropSetMeth(strm, tOpenMode, mode_t);
204 	INTERFACEpropSetMeth(strm, sType, strmType_t);
205 	INTERFACEpropSetMeth(strm, iZipLevel, int);
206 	INTERFACEpropSetMeth(strm, bSync, int);
207 	INTERFACEpropSetMeth(strm, bReopenOnTruncate, int);
208 	INTERFACEpropSetMeth(strm, sIOBufSize, size_t);
209 	INTERFACEpropSetMeth(strm, iSizeLimit, off_t);
210 	INTERFACEpropSetMeth(strm, iFlushInterval, int);
211 	INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*);
212 	/* v6 added */
213 	rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, const uchar *,
214 		uint32_t trimLineOverBytes, int64 *const strtOffs);
215 	/* v7 added  2012-09-14 */
216 	INTERFACEpropSetMeth(strm, bVeryReliableZip, int);
217 	/* v8 added  2013-03-21 */
218 	rsRetVal (*CheckFileChange)(strm_t *pThis);
219 	/* v9 added  2013-04-04 */
220 	INTERFACEpropSetMeth(strm, cryprov, cryprov_if_t*);
221 	INTERFACEpropSetMeth(strm, cryprovData, void*);
222 ENDinterface(strm)
223 #define strmCURR_IF_VERSION 14 /* increment whenever you change the interface structure! */
224 /* V10, 2013-09-10: added new parameter bEscapeLF, changed mode to uint8_t (rgerhards) */
225 /* V11, 2015-12-03: added new parameter bReopenOnTruncate */
226 /* V12, 2015-12-11: added new parameter trimLineOverBytes, changed mode to uint32_t */
227 /* V13, 2017-09-06: added new parameter strtoffs to ReadLine() */
228 /* V14, 2019-11-13: added new parameter bEscapeLFString (rgerhards) */
229 
230 #define strmGetCurrFileNum(pStrm) ((pStrm)->iCurrFNum)
231 
232 /* prototypes */
233 PROTOTYPEObjClassInit(strm);
234 rsRetVal strmMultiFileSeek(strm_t *pThis, unsigned int fileNum, off64_t offs, off64_t *bytesDel);
235 rsRetVal ATTR_NONNULL(1,2) strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg,
236 	regex_t *end_preg, const sbool bEscapeLF, const uchar *const escapeLFString,
237 	const sbool discardTruncatedMsg, const sbool msgDiscardingError, int64 *const strtOffs);
238 int strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis);
239 void strmDebugOutBuf(const strm_t *const pThis);
240 void strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val);
241 const uchar * ATTR_NONNULL() strmGetPrevLineSegment(strm_t *const pThis);
242 const uchar * ATTR_NONNULL() strmGetPrevMsgSegment(strm_t *const pThis);
243 int ATTR_NONNULL() strmGetPrevWasNL(const strm_t *const pThis);
244 void ATTR_NONNULL() strmSet_checkRotation(strm_t *const pThis, const int val);
245 
246 #endif /* #ifndef STREAM_H_INCLUDED */
247