1 /* Definition of serial stream class (strm). 2 * 3 * A serial stream provides serial data access. In theory, serial streams 4 * can be implemented via a number of methods (e.g. files or in-memory 5 * streams). In practice, there currently only exist the file type (aka 6 * "driver"). 7 * 8 * In practice, many stream features are bound to files. I have not yet made 9 * any serious effort, except for the naming of this class, to try to make 10 * the interfaces very generic. However, I assume that we could work much 11 * like in the strm class, where some properties are simply ignored when 12 * the wrong strm mode is selected (which would translate here to the wrong 13 * stream mode). 14 * 15 * Most importantly, this class provides generic input and output functions 16 * which can directly be used to work with the strms and file output. It 17 * provides such useful things like a circular file buffer and, hopefully 18 * at a later stage, a lazy writer. The object is also seriazable and thus 19 * can easily be persistet. The bottom line is that it makes much sense to 20 * use this class whereever possible as its features may grow in the future. 21 * 22 * An important note on writing gzip format via zlib (kept anonymous 23 * by request): 24 * 25 * -------------------------------------------------------------------------- 26 * We'd like to make sure the output file is in full gzip format 27 * (compatible with gzip -d/zcat etc). There is a flag in how the output 28 * is initialized within zlib to properly add the gzip wrappers to the 29 * output. (gzip is effectively a small metadata wrapper around raw 30 * zstream output.) 31 * 32 * I had written an old bit of code to do this - the documentation on 33 * deflatInit2() was pretty tricky to nail down on this specific feature: 34 * 35 * int deflateInit2 (z_streamp strm, int level, int method, int windowBits, 36 * int memLevel, int strategy); 37 * 38 * I believe "31" would be the value for the "windowBits" field that you'd 39 * want to try: 40 * 41 * deflateInit2(zstrmptr, 6, Z_DEFLATED, 31, 9, Z_DEFAULT_STRATEGY); 42 * -------------------------------------------------------------------------- 43 * 44 * Copyright 2008-2018 Rainer Gerhards and Adiscon GmbH. 45 * 46 * This file is part of the rsyslog runtime library. 47 * 48 * The rsyslog runtime library is free software: you can redistribute it and/or modify 49 * it under the terms of the GNU Lesser General Public License as published by 50 * the Free Software Foundation, either version 3 of the License, or 51 * (at your option) any later version. 52 * 53 * The rsyslog runtime library is distributed in the hope that it will be useful, 54 * but WITHOUT ANY WARRANTY; without even the implied warranty of 55 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 56 * GNU Lesser General Public License for more details. 57 * 58 * You should have received a copy of the GNU Lesser General Public License 59 * along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>. 60 * 61 * A copy of the GPL can be found in the file "COPYING" in this distribution. 62 * A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution. 63 */ 64 65 #ifndef STREAM_H_INCLUDED 66 #define STREAM_H_INCLUDED 67 68 #include <regex.h> // TODO: fix via own module 69 #include <pthread.h> 70 #include <stdint.h> 71 #include <time.h> 72 #include "obj-types.h" 73 #include "glbl.h" 74 #include "stream.h" 75 #include "zlibw.h" 76 #include "cryprov.h" 77 78 /* stream types */ 79 typedef enum { 80 STREAMTYPE_FILE_SINGLE = 0, /**< read a single file */ 81 STREAMTYPE_FILE_CIRCULAR = 1, /**< circular files */ 82 STREAMTYPE_FILE_MONITOR = 2, /**< monitor a (third-party) file */ 83 STREAMTYPE_NAMED_PIPE = 3 /**< file is a named pipe (so far, tested for output only) */ 84 } strmType_t; 85 86 typedef enum { /* when extending, do NOT change existing modes! */ 87 STREAMMMODE_INVALID = 0, 88 STREAMMODE_READ = 1, 89 STREAMMODE_WRITE = 2, 90 STREAMMODE_WRITE_TRUNC = 3, 91 STREAMMODE_WRITE_APPEND = 4 92 } strmMode_t; 93 94 /* settings for stream rotation (applies not to all processing modes!) */ 95 #define STRM_ROTATION_DO_CHECK 0 96 #define STRM_ROTATION_DO_NOT_CHECK 1 97 98 #define STREAM_ASYNC_NUMBUFS 2 /* must be a power of 2 -- TODO: make configurable */ 99 /* The strm_t data structure */ 100 typedef struct strm_s { 101 BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */ 102 strmType_t sType; 103 /* descriptive properties */ 104 unsigned int iCurrFNum;/* current file number (NOT descriptor, but the number in the file name!) */ 105 uchar *pszFName; /* prefix for generated filenames */ 106 int lenFName; 107 strmMode_t tOperationsMode; 108 mode_t tOpenMode; 109 int64 iMaxFileSize;/* maximum size a file may grow to */ 110 unsigned int iMaxFiles; /* maximum number of files if a circular mode is in use */ 111 int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */ 112 sbool bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */ 113 int64 iCurrOffs;/* current offset */ 114 int64 *pUsrWCntr; /* NULL or a user-provided counter that receives the nbr of bytes written since 115 the last CntrSet() */ 116 sbool bPrevWasNL; /* used for readLine() when reading multi-line messages */ 117 /* dynamic properties, valid only during file open, not to be persistet */ 118 sbool bDisabled; /* should file no longer be written to? (currently set only if omfile file size limit fails) */ 119 sbool bSync; /* sync this file after every write? */ 120 sbool bReopenOnTruncate; 121 int rotationCheck; /* rotation check mode */ 122 size_t sIOBufSize;/* size of IO buffer */ 123 uchar *pszDir; /* Directory */ 124 int lenDir; 125 int fd; /* the file descriptor, -1 if closed */ 126 int fdDir; /* the directory's descriptor, in case bSync is requested (-1 if closed) */ 127 int readTimeout;/* 0: do not timeout */ 128 time_t lastRead;/* for timeout processing */ 129 ino_t inode; /* current inode for files being monitored (undefined else) */ 130 uchar *pszCurrFName; /* name of current file (if open) */ 131 uchar *pIOBuf; /* the iobuffer currently in use to gather data */ 132 char *pIOBuf_truncation; /* iobuffer used during trucation detection block re-reads */ 133 size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */ 134 size_t iBufPtr; /* pointer into current buffer */ 135 int iUngetC; /* char set via UngetChar() call or -1 if none set */ 136 sbool bInRecord; /* if 1, indicates that we are currently writing a not-yet complete record */ 137 int iZipLevel; /* zip level (0..9). If 0, zip is completely disabled */ 138 Bytef *pZipBuf; 139 /* support for async flush procesing */ 140 sbool bAsyncWrite; /* do asynchronous writes (always if a flush interval is given) */ 141 sbool bStopWriter; /* shall writer thread terminate? */ 142 sbool bDoTimedWait; /* instruct writer thread to do a times wait to support flush timeouts */ 143 sbool bzInitDone; /* did we do an init of zstrm already? */ 144 sbool bFlushNow; /* shall we flush with the next async write? */ 145 sbool bVeryReliableZip; /* shall we write interim headers to create a very reliable ZIP file? */ 146 int iFlushInterval; /* flush in which interval - 0, no flushing */ 147 pthread_mutex_t mut;/* mutex for flush in async mode */ 148 pthread_cond_t notFull; 149 pthread_cond_t notEmpty; 150 pthread_cond_t isEmpty; 151 unsigned short iEnq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ 152 unsigned short iDeq; /* this MUST be unsigned as we use module arithmetic (else invalid indexing happens!) */ 153 cryprov_if_t *cryprov; /* ptr to crypto provider; NULL = do not encrypt */ 154 void *cryprovData; /* opaque data ptr for provider use */ 155 void *cryprovFileData;/* opaque data ptr for file instance */ 156 short iCnt; /* current nbr of elements in buffer */ 157 z_stream zstrm; /* zip stream to use */ 158 struct { 159 uchar *pBuf; 160 size_t lenBuf; 161 } asyncBuf[STREAM_ASYNC_NUMBUFS]; 162 pthread_t writerThreadID; 163 /* support for omfile size-limiting commands, special counters, NOT persisted! */ 164 off_t iSizeLimit; /* file size limit, 0 = no limit */ 165 uchar *pszSizeLimitCmd; /* command to carry out when size limit is reached */ 166 sbool bIsTTY; /* is this a tty file? */ 167 cstr_t *prevLineSegment; /* for ReadLine, previous, unprocessed part of file */ 168 cstr_t *prevMsgSegment; /* for ReadMultiLine, previous, yet unprocessed part of msg */ 169 int64 strtOffs; /* start offset in file for current line/msg */ 170 int fileNotFoundError; /* boolean; if set, report file not found errors, else silently ignore */ 171 int noRepeatedErrorOutput; /* if a file is missing the Error is only given once */ 172 int ignoringMsg; 173 } strm_t; 174 175 176 /* interfaces */ 177 BEGINinterface(strm) /* name must also be changed in ENDinterface macro! */ 178 rsRetVal (*Construct)(strm_t **ppThis); 179 rsRetVal (*ConstructFinalize)(strm_t *pThis); 180 rsRetVal (*Destruct)(strm_t **ppThis); 181 rsRetVal (*SetFileName)(strm_t *pThis, uchar *pszName, size_t iLenName); 182 rsRetVal (*ReadChar)(strm_t *pThis, uchar *pC); 183 rsRetVal (*UnreadChar)(strm_t *pThis, uchar c); 184 rsRetVal (*SeekCurrOffs)(strm_t *pThis); 185 rsRetVal (*Write)(strm_t *const pThis, const uchar *const pBuf, size_t lenBuf); 186 rsRetVal (*WriteChar)(strm_t *pThis, uchar c); 187 rsRetVal (*WriteLong)(strm_t *pThis, long i); 188 rsRetVal (*SetFileNotFoundError)(strm_t *pThis, int pFileNotFoundError); 189 rsRetVal (*SetFName)(strm_t *pThis, uchar *pszPrefix, size_t iLenPrefix); 190 rsRetVal (*SetDir)(strm_t *pThis, uchar *pszDir, size_t iLenDir); 191 rsRetVal (*Flush)(strm_t *pThis); 192 rsRetVal (*RecordBegin)(strm_t *pThis); 193 rsRetVal (*RecordEnd)(strm_t *pThis); 194 rsRetVal (*Serialize)(strm_t *pThis, strm_t *pStrm); 195 rsRetVal (*GetCurrOffset)(strm_t *pThis, int64 *pOffs); 196 rsRetVal (*SetWCntr)(strm_t *pThis, number_t *pWCnt); 197 rsRetVal (*Dup)(strm_t *pThis, strm_t **ppNew); 198 INTERFACEpropSetMeth(strm, bDeleteOnClose, int); 199 INTERFACEpropSetMeth(strm, iMaxFileSize, int64); 200 INTERFACEpropSetMeth(strm, iMaxFiles, int); 201 INTERFACEpropSetMeth(strm, iFileNumDigits, int); 202 INTERFACEpropSetMeth(strm, tOperationsMode, int); 203 INTERFACEpropSetMeth(strm, tOpenMode, mode_t); 204 INTERFACEpropSetMeth(strm, sType, strmType_t); 205 INTERFACEpropSetMeth(strm, iZipLevel, int); 206 INTERFACEpropSetMeth(strm, bSync, int); 207 INTERFACEpropSetMeth(strm, bReopenOnTruncate, int); 208 INTERFACEpropSetMeth(strm, sIOBufSize, size_t); 209 INTERFACEpropSetMeth(strm, iSizeLimit, off_t); 210 INTERFACEpropSetMeth(strm, iFlushInterval, int); 211 INTERFACEpropSetMeth(strm, pszSizeLimitCmd, uchar*); 212 /* v6 added */ 213 rsRetVal (*ReadLine)(strm_t *pThis, cstr_t **ppCStr, uint8_t mode, sbool bEscapeLF, const uchar *, 214 uint32_t trimLineOverBytes, int64 *const strtOffs); 215 /* v7 added 2012-09-14 */ 216 INTERFACEpropSetMeth(strm, bVeryReliableZip, int); 217 /* v8 added 2013-03-21 */ 218 rsRetVal (*CheckFileChange)(strm_t *pThis); 219 /* v9 added 2013-04-04 */ 220 INTERFACEpropSetMeth(strm, cryprov, cryprov_if_t*); 221 INTERFACEpropSetMeth(strm, cryprovData, void*); 222 ENDinterface(strm) 223 #define strmCURR_IF_VERSION 14 /* increment whenever you change the interface structure! */ 224 /* V10, 2013-09-10: added new parameter bEscapeLF, changed mode to uint8_t (rgerhards) */ 225 /* V11, 2015-12-03: added new parameter bReopenOnTruncate */ 226 /* V12, 2015-12-11: added new parameter trimLineOverBytes, changed mode to uint32_t */ 227 /* V13, 2017-09-06: added new parameter strtoffs to ReadLine() */ 228 /* V14, 2019-11-13: added new parameter bEscapeLFString (rgerhards) */ 229 230 #define strmGetCurrFileNum(pStrm) ((pStrm)->iCurrFNum) 231 232 /* prototypes */ 233 PROTOTYPEObjClassInit(strm); 234 rsRetVal strmMultiFileSeek(strm_t *pThis, unsigned int fileNum, off64_t offs, off64_t *bytesDel); 235 rsRetVal ATTR_NONNULL(1,2) strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg, 236 regex_t *end_preg, const sbool bEscapeLF, const uchar *const escapeLFString, 237 const sbool discardTruncatedMsg, const sbool msgDiscardingError, int64 *const strtOffs); 238 int strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis); 239 void strmDebugOutBuf(const strm_t *const pThis); 240 void strmSetReadTimeout(strm_t *const __restrict__ pThis, const int val); 241 const uchar * ATTR_NONNULL() strmGetPrevLineSegment(strm_t *const pThis); 242 const uchar * ATTR_NONNULL() strmGetPrevMsgSegment(strm_t *const pThis); 243 int ATTR_NONNULL() strmGetPrevWasNL(const strm_t *const pThis); 244 void ATTR_NONNULL() strmSet_checkRotation(strm_t *const pThis, const int val); 245 246 #endif /* #ifndef STREAM_H_INCLUDED */ 247