1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
21 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
22 
23 #include <thrift/transport/TTransport.h>
24 #include <thrift/Thrift.h>
25 #include <thrift/TProcessor.h>
26 
27 #include <string>
28 #include <stdio.h>
29 
30 #include <boost/scoped_ptr.hpp>
31 #include <boost/shared_ptr.hpp>
32 
33 #include <thrift/concurrency/Mutex.h>
34 #include <thrift/concurrency/Monitor.h>
35 #include <thrift/concurrency/PlatformThreadFactory.h>
36 #include <thrift/concurrency/Thread.h>
37 
38 namespace apache
39 {
40 namespace thrift
41 {
42 namespace transport
43 {
44 
45 using apache::thrift::TProcessor;
46 using apache::thrift::protocol::TProtocolFactory;
47 using apache::thrift::concurrency::Mutex;
48 using apache::thrift::concurrency::Monitor;
49 
50 // Data pertaining to a single event
51 typedef struct eventInfo
52 {
53     uint8_t* eventBuff_;
54     uint32_t eventSize_;
55     uint32_t eventBuffPos_;
56 
eventInfoeventInfo57     eventInfo(): eventBuff_(NULL), eventSize_(0), eventBuffPos_(0) {};
~eventInfoeventInfo58     ~eventInfo()
59     {
60         if (eventBuff_)
61         {
62             delete[] eventBuff_;
63         }
64     }
65 } eventInfo;
66 
67 // information about current read state
68 typedef struct readState
69 {
70     eventInfo* event_;
71 
72     // keep track of event size
73     uint8_t   eventSizeBuff_[4];
74     uint8_t   eventSizeBuffPos_;
75     bool      readingSize_;
76 
77     // read buffer variables
78     int32_t  bufferPtr_;
79     int32_t  bufferLen_;
80 
81     // last successful dispatch point
82     int32_t lastDispatchPtr_;
83 
resetStatereadState84     void resetState(uint32_t lastDispatchPtr)
85     {
86         readingSize_ = true;
87         eventSizeBuffPos_ = 0;
88         lastDispatchPtr_ = lastDispatchPtr;
89     }
90 
resetAllValuesreadState91     void resetAllValues()
92     {
93         resetState(0);
94         bufferPtr_ = 0;
95         bufferLen_ = 0;
96 
97         if (event_)
98         {
99             delete (event_);
100         }
101 
102         event_ = 0;
103     }
104 
getEventSizereadState105     inline uint32_t getEventSize()
106     {
107         const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
108         return *reinterpret_cast<const uint32_t*>(buffer);
109     }
110 
readStatereadState111     readState()
112     {
113         event_ = 0;
114         resetAllValues();
115     }
116 
~readStatereadState117     ~readState()
118     {
119         if (event_)
120         {
121             delete (event_);
122         }
123     }
124 
125 } readState;
126 
127 /**
128  * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
129  * to be written to disk.  Should be used in the following way:
130  *  1) Buffer created
131  *  2) Buffer written to (addEvent)
132  *  3) Buffer read from (getNext)
133  *  4) Buffer reset (reset)
134  *  5) Go back to 2, or destroy buffer
135  *
136  * The buffer should never be written to after it is read from, unless it is reset first.
137  * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
138  *       which uses the buffer in this way.
139  *
140  */
141 class TFileTransportBuffer
142 {
143 public:
144     TFileTransportBuffer(uint32_t size);
145     ~TFileTransportBuffer();
146 
147     bool addEvent(eventInfo* event);
148     eventInfo* getNext();
149     void reset();
150     bool isFull();
151     bool isEmpty();
152 
153 private:
154     TFileTransportBuffer(); // should not be used
155 
156     enum mode
157     {
158         WRITE,
159         READ
160     };
161     mode bufferMode_;
162 
163     uint32_t writePoint_;
164     uint32_t readPoint_;
165     uint32_t size_;
166     eventInfo** buffer_;
167 };
168 
169 /**
170  * Abstract interface for transports used to read files
171  */
172 class TFileReaderTransport : virtual public TTransport
173 {
174 public:
175     virtual int32_t getReadTimeout() = 0;
176     virtual void setReadTimeout(int32_t readTimeout) = 0;
177 
178     virtual uint32_t getNumChunks() = 0;
179     virtual uint32_t getCurChunk() = 0;
180     virtual void seekToChunk(int32_t chunk) = 0;
181     virtual void seekToEnd() = 0;
182 };
183 
184 /**
185  * Abstract interface for transports used to write files
186  */
187 class TFileWriterTransport : virtual public TTransport
188 {
189 public:
190     virtual uint32_t getChunkSize() = 0;
191     virtual void setChunkSize(uint32_t chunkSize) = 0;
192 };
193 
194 /**
195  * File implementation of a transport. Reads and writes are done to a
196  * file on disk.
197  *
198  */
199 class TFileTransport : public TFileReaderTransport,
200     public TFileWriterTransport
201 {
202 public:
203     TFileTransport(std::string path, bool readOnly = false);
204     ~TFileTransport();
205 
206     // TODO: what is the correct behaviour for this?
207     // the log file is generally always open
isOpen()208     bool isOpen()
209     {
210         return true;
211     }
212 
213     void write(const uint8_t* buf, uint32_t len);
214     void flush();
215 
216     uint32_t readAll(uint8_t* buf, uint32_t len);
217     uint32_t read(uint8_t* buf, uint32_t len);
218     bool peek();
219 
220     // log-file specific functions
221     void seekToChunk(int32_t chunk);
222     void seekToEnd();
223     uint32_t getNumChunks();
224     uint32_t getCurChunk();
225 
226     // for changing the output file
227     void resetOutputFile(int fd, std::string filename, off_t offset);
228 
229     // Setter/Getter functions for user-controllable options
setReadBuffSize(uint32_t readBuffSize)230     void setReadBuffSize(uint32_t readBuffSize)
231     {
232         if (readBuffSize)
233         {
234             readBuffSize_ = readBuffSize;
235         }
236     }
getReadBuffSize()237     uint32_t getReadBuffSize()
238     {
239         return readBuffSize_;
240     }
241 
242     static const int32_t TAIL_READ_TIMEOUT = -1;
243     static const int32_t NO_TAIL_READ_TIMEOUT = 0;
setReadTimeout(int32_t readTimeout)244     void setReadTimeout(int32_t readTimeout)
245     {
246         readTimeout_ = readTimeout;
247     }
getReadTimeout()248     int32_t getReadTimeout()
249     {
250         return readTimeout_;
251     }
252 
setChunkSize(uint32_t chunkSize)253     void setChunkSize(uint32_t chunkSize)
254     {
255         if (chunkSize)
256         {
257             chunkSize_ = chunkSize;
258         }
259     }
getChunkSize()260     uint32_t getChunkSize()
261     {
262         return chunkSize_;
263     }
264 
setEventBufferSize(uint32_t bufferSize)265     void setEventBufferSize(uint32_t bufferSize)
266     {
267         if (bufferAndThreadInitialized_)
268         {
269             GlobalOutput("Cannot change the buffer size after writer thread started");
270             return;
271         }
272 
273         eventBufferSize_ = bufferSize;
274     }
275 
getEventBufferSize()276     uint32_t getEventBufferSize()
277     {
278         return eventBufferSize_;
279     }
280 
setFlushMaxUs(uint32_t flushMaxUs)281     void setFlushMaxUs(uint32_t flushMaxUs)
282     {
283         if (flushMaxUs)
284         {
285             flushMaxUs_ = flushMaxUs;
286         }
287     }
getFlushMaxUs()288     uint32_t getFlushMaxUs()
289     {
290         return flushMaxUs_;
291     }
292 
setFlushMaxBytes(uint32_t flushMaxBytes)293     void setFlushMaxBytes(uint32_t flushMaxBytes)
294     {
295         if (flushMaxBytes)
296         {
297             flushMaxBytes_ = flushMaxBytes;
298         }
299     }
getFlushMaxBytes()300     uint32_t getFlushMaxBytes()
301     {
302         return flushMaxBytes_;
303     }
304 
setMaxEventSize(uint32_t maxEventSize)305     void setMaxEventSize(uint32_t maxEventSize)
306     {
307         maxEventSize_ = maxEventSize;
308     }
getMaxEventSize()309     uint32_t getMaxEventSize()
310     {
311         return maxEventSize_;
312     }
313 
setMaxCorruptedEvents(uint32_t maxCorruptedEvents)314     void setMaxCorruptedEvents(uint32_t maxCorruptedEvents)
315     {
316         maxCorruptedEvents_ = maxCorruptedEvents;
317     }
getMaxCorruptedEvents()318     uint32_t getMaxCorruptedEvents()
319     {
320         return maxCorruptedEvents_;
321     }
322 
setEofSleepTimeUs(uint32_t eofSleepTime)323     void setEofSleepTimeUs(uint32_t eofSleepTime)
324     {
325         if (eofSleepTime)
326         {
327             eofSleepTime_ = eofSleepTime;
328         }
329     }
getEofSleepTimeUs()330     uint32_t getEofSleepTimeUs()
331     {
332         return eofSleepTime_;
333     }
334 
335     /*
336      * Override TTransport *_virt() functions to invoke our implementations.
337      * We cannot use TVirtualTransport to provide these, since we need to inherit
338      * virtually from TTransport.
339      */
read_virt(uint8_t * buf,uint32_t len)340     virtual uint32_t read_virt(uint8_t* buf, uint32_t len)
341     {
342         return this->read(buf, len);
343     }
readAll_virt(uint8_t * buf,uint32_t len)344     virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len)
345     {
346         return this->readAll(buf, len);
347     }
write_virt(const uint8_t * buf,uint32_t len)348     virtual void write_virt(const uint8_t* buf, uint32_t len)
349     {
350         this->write(buf, len);
351     }
352 
353 private:
354     // helper functions for writing to a file
355     void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
356     bool swapEventBuffers(struct timeval* deadline);
357     bool initBufferAndWriteThread();
358 
359     // control for writer thread
startWriterThread(void * ptr)360     static void* startWriterThread(void* ptr)
361     {
362         static_cast<TFileTransport*>(ptr)->writerThread();
363         return NULL;
364     }
365     void writerThread();
366 
367     // helper functions for reading from a file
368     eventInfo* readEvent();
369 
370     // event corruption-related functions
371     bool isEventCorrupted();
372     void performRecovery();
373 
374     // Utility functions
375     void openLogFile();
376     void getNextFlushTime(struct timeval* ts_next_flush);
377 
378     // Class variables
379     readState readState_;
380     uint8_t* readBuff_;
381     eventInfo* currentEvent_;
382 
383     uint32_t readBuffSize_;
384     static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
385 
386     int32_t readTimeout_;
387     static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
388 
389     // size of chunks that file will be split up into
390     uint32_t chunkSize_;
391     static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
392 
393     // size of event buffers
394     uint32_t eventBufferSize_;
395     static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
396 
397     // max number of microseconds that can pass without flushing
398     uint32_t flushMaxUs_;
399     static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
400 
401     // max number of bytes that can be written without flushing
402     uint32_t flushMaxBytes_;
403     static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
404 
405     // max event size
406     uint32_t maxEventSize_;
407     static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
408 
409     // max number of corrupted events per chunk
410     uint32_t maxCorruptedEvents_;
411     static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
412 
413     // sleep duration when EOF is hit
414     uint32_t eofSleepTime_;
415     static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
416 
417     // sleep duration when a corrupted event is encountered
418     uint32_t corruptedEventSleepTime_;
419     static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
420 
421     // sleep duration in seconds when an IO error is encountered in the writer thread
422     uint32_t writerThreadIOErrorSleepTime_;
423     static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
424 
425     // writer thread
426     apache::thrift::concurrency::PlatformThreadFactory threadFactory_;
427     boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
428 
429     // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
430     // needs to be written to the file.  The buffers are swapped by the writer thread.
431     TFileTransportBuffer* dequeueBuffer_;
432     TFileTransportBuffer* enqueueBuffer_;
433 
434     // conditions used to block when the buffer is full or empty
435     Monitor notFull_, notEmpty_;
436     volatile bool closing_;
437 
438     // To keep track of whether the buffer has been flushed
439     Monitor flushed_;
440     volatile bool forceFlush_;
441 
442     // Mutex that is grabbed when enqueueing and swapping the read/write buffers
443     Mutex mutex_;
444 
445     // File information
446     std::string filename_;
447     int fd_;
448 
449     // Whether the writer thread and buffers have been initialized
450     bool bufferAndThreadInitialized_;
451 
452     // Offset within the file
453     off_t offset_;
454 
455     // event corruption information
456     uint32_t lastBadChunk_;
457     uint32_t numCorruptedEventsInChunk_;
458 
459     bool readOnly_;
460 };
461 
462 // Exception thrown when EOF is hit
463 class TEOFException : public TTransportException
464 {
465 public:
TEOFException()466     TEOFException():
467         TTransportException(TTransportException::END_OF_FILE) {};
468 };
469 
470 
471 // wrapper class to process events from a file containing thrift events
472 class TFileProcessor
473 {
474 public:
475     /**
476      * Constructor that defaults output transport to null transport
477      *
478      * @param processor processes log-file events
479      * @param protocolFactory protocol factory
480      * @param inputTransport file transport
481      */
482     TFileProcessor(boost::shared_ptr<TProcessor> processor,
483                    boost::shared_ptr<TProtocolFactory> protocolFactory,
484                    boost::shared_ptr<TFileReaderTransport> inputTransport);
485 
486     TFileProcessor(boost::shared_ptr<TProcessor> processor,
487                    boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
488                    boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
489                    boost::shared_ptr<TFileReaderTransport> inputTransport);
490 
491     /**
492      * Constructor
493      *
494      * @param processor processes log-file events
495      * @param protocolFactory protocol factory
496      * @param inputTransport input file transport
497      * @param output output transport
498      */
499     TFileProcessor(boost::shared_ptr<TProcessor> processor,
500                    boost::shared_ptr<TProtocolFactory> protocolFactory,
501                    boost::shared_ptr<TFileReaderTransport> inputTransport,
502                    boost::shared_ptr<TTransport> outputTransport);
503 
504     /**
505      * processes events from the file
506      *
507      * @param numEvents number of events to process (0 for unlimited)
508      * @param tail tails the file if true
509      */
510     void process(uint32_t numEvents, bool tail);
511 
512     /**
513      * process events until the end of the chunk
514      *
515      */
516     void processChunk();
517 
518 private:
519     boost::shared_ptr<TProcessor> processor_;
520     boost::shared_ptr<TProtocolFactory> inputProtocolFactory_;
521     boost::shared_ptr<TProtocolFactory> outputProtocolFactory_;
522     boost::shared_ptr<TFileReaderTransport> inputTransport_;
523     boost::shared_ptr<TTransport> outputTransport_;
524 };
525 
526 
527 }
528 }
529 } // apache::thrift::transport
530 
531 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
532