1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <thrift/transport/TFileTransport.h>
23 #include <thrift/transport/TTransportUtils.h>
24 #include <thrift/transport/PlatformSocket.h>
25 #include <thrift/concurrency/FunctionRunner.h>
26 
27 #include <boost/version.hpp>
28 
29 #ifdef HAVE_SYS_TIME_H
30 #include <sys/time.h>
31 #else
32 #include <time.h>
33 #endif
34 #include <fcntl.h>
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_STRINGS_H
39 #include <strings.h>
40 #endif
41 #include <cstdlib>
42 #include <cstring>
43 #include <iostream>
44 #include <limits>
45 #include <memory>
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49 
50 #ifdef _WIN32
51 #include <io.h>
52 #endif
53 
54 namespace apache {
55 namespace thrift {
56 namespace transport {
57 
58 using std::shared_ptr;
59 using std::cerr;
60 using std::cout;
61 using std::endl;
62 using std::string;
63 using namespace apache::thrift::protocol;
64 using namespace apache::thrift::concurrency;
65 
TFileTransport(string path,bool readOnly,std::shared_ptr<TConfiguration> config)66 TFileTransport::TFileTransport(string path, bool readOnly, std::shared_ptr<TConfiguration> config)
67   : TTransport(config),
68     readState_(),
69     readBuff_(nullptr),
70     currentEvent_(nullptr),
71     readBuffSize_(DEFAULT_READ_BUFF_SIZE),
72     readTimeout_(NO_TAIL_READ_TIMEOUT),
73     chunkSize_(DEFAULT_CHUNK_SIZE),
74     eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
75     flushMaxUs_(DEFAULT_FLUSH_MAX_US),
76     flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
77     maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
78     maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
79     eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
80     corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
81     writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
82     dequeueBuffer_(nullptr),
83     enqueueBuffer_(nullptr),
84     notFull_(&mutex_),
85     notEmpty_(&mutex_),
86     closing_(false),
87     flushed_(&mutex_),
88     forceFlush_(false),
89     filename_(path),
90     fd_(0),
91     bufferAndThreadInitialized_(false),
92     offset_(0),
93     lastBadChunk_(0),
94     numCorruptedEventsInChunk_(0),
95     readOnly_(readOnly) {
96   threadFactory_.setDetached(false);
97   openLogFile();
98 }
99 
resetOutputFile(int fd,string filename,off_t offset)100 void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
101   filename_ = filename;
102   offset_ = offset;
103 
104   // check if current file is still open
105   if (fd_ > 0) {
106     // flush any events in the queue
107     flush();
108     GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
109     if (-1 == ::THRIFT_CLOSE(fd_)) {
110       int errno_copy = THRIFT_ERRNO;
111       GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
112       throw TTransportException(TTransportException::UNKNOWN,
113                                 "TFileTransport: error in file close",
114                                 errno_copy);
115     } else {
116       // successfully closed fd
117       fd_ = 0;
118     }
119   }
120 
121   if (fd) {
122     fd_ = fd;
123   } else {
124     // open file if the input fd is 0
125     openLogFile();
126   }
127 }
128 
~TFileTransport()129 TFileTransport::~TFileTransport() {
130   // flush the buffer if a writer thread is active
131   if (writerThread_.get()) {
132     // set state to closing
133     closing_ = true;
134 
135     // wake up the writer thread
136     // Since closing_ is true, it will attempt to flush all data, then exit.
137     notEmpty_.notify();
138 
139     writerThread_->join();
140     writerThread_.reset();
141   }
142 
143   if (dequeueBuffer_) {
144     delete dequeueBuffer_;
145     dequeueBuffer_ = nullptr;
146   }
147 
148   if (enqueueBuffer_) {
149     delete enqueueBuffer_;
150     enqueueBuffer_ = nullptr;
151   }
152 
153   if (readBuff_) {
154     delete[] readBuff_;
155     readBuff_ = nullptr;
156   }
157 
158   if (currentEvent_) {
159     delete currentEvent_;
160     currentEvent_ = nullptr;
161   }
162 
163   // close logfile
164   if (fd_ > 0) {
165     if (-1 == ::THRIFT_CLOSE(fd_)) {
166       GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
167     } else {
168       // successfully closed fd
169       fd_ = 0;
170     }
171   }
172 }
173 
initBufferAndWriteThread()174 bool TFileTransport::initBufferAndWriteThread() {
175   if (bufferAndThreadInitialized_) {
176     T_ERROR("%s", "Trying to double-init TFileTransport");
177     return false;
178   }
179 
180   if (!writerThread_.get()) {
181     writerThread_ = threadFactory_.newThread(
182         apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
183     writerThread_->start();
184   }
185 
186   dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
187   enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
188   bufferAndThreadInitialized_ = true;
189 
190   return true;
191 }
192 
write(const uint8_t * buf,uint32_t len)193 void TFileTransport::write(const uint8_t* buf, uint32_t len) {
194   if (readOnly_) {
195     throw TTransportException("TFileTransport: attempting to write to file opened readonly");
196   }
197 
198   enqueueEvent(buf, len);
199 }
200 
201 template <class _T>
202 struct uniqueDeleter
203 {
operator ()apache::thrift::transport::uniqueDeleter204   void operator()(_T *ptr) const { delete ptr; }
205 };
206 
enqueueEvent(const uint8_t * buf,uint32_t eventLen)207 void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
208   // can't enqueue more events if file is going to close
209   if (closing_) {
210     return;
211   }
212 
213   // make sure that event size is valid
214   if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
215     T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
216     return;
217   }
218 
219   if (eventLen == 0) {
220     T_ERROR("%s", "cannot enqueue an empty event");
221     return;
222   }
223 
224   std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo());
225   toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
226 
227   // first 4 bytes is the event length
228   memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
229   // actual event contents
230   memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
231   toEnqueue->eventSize_ = eventLen + 4;
232 
233   // lock mutex
234   Guard g(mutex_);
235 
236   // make sure that enqueue buffer is initialized and writer thread is running
237   if (!bufferAndThreadInitialized_) {
238     if (!initBufferAndWriteThread()) {
239       return;
240     }
241   }
242 
243   // Can't enqueue while buffer is full
244   while (enqueueBuffer_->isFull()) {
245     notFull_.wait();
246   }
247 
248   // We shouldn't be trying to enqueue new data while a forced flush is
249   // requested.  (Otherwise the writer thread might not ever be able to finish
250   // the flush if more data keeps being enqueued.)
251   assert(!forceFlush_);
252 
253   // add to the buffer
254   eventInfo* pEvent = toEnqueue.release();
255   if (!enqueueBuffer_->addEvent(pEvent)) {
256     delete pEvent;
257     return;
258   }
259 
260   // signal anybody who's waiting for the buffer to be non-empty
261   notEmpty_.notify();
262 
263   // this really should be a loop where it makes sure it got flushed
264   // because condition variables can get triggered by the os for no reason
265   // it is probably a non-factor for the time being
266 }
267 
swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> * deadline)268 bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
269   bool swap;
270   Guard g(mutex_);
271 
272   if (!enqueueBuffer_->isEmpty()) {
273     swap = true;
274   } else if (closing_) {
275     // even though there is no data to write,
276     // return immediately if the transport is closing
277     swap = false;
278   } else {
279     if (deadline != nullptr) {
280       // if we were handed a deadline time struct, do a timed wait
281       notEmpty_.waitForTime(*deadline);
282     } else {
283       // just wait until the buffer gets an item
284       notEmpty_.wait();
285     }
286 
287     // could be empty if we timed out
288     swap = enqueueBuffer_->isEmpty();
289   }
290 
291   if (swap) {
292     TFileTransportBuffer* temp = enqueueBuffer_;
293     enqueueBuffer_ = dequeueBuffer_;
294     dequeueBuffer_ = temp;
295   }
296 
297   if (swap) {
298     notFull_.notify();
299   }
300 
301   return swap;
302 }
303 
writerThread()304 void TFileTransport::writerThread() {
305   bool hasIOError = false;
306 
307   // open file if it is not open
308   if (!fd_) {
309     try {
310       openLogFile();
311     } catch (...) {
312       int errno_copy = THRIFT_ERRNO;
313       GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
314       fd_ = 0;
315       hasIOError = true;
316     }
317   }
318 
319   // set the offset to the correct value (EOF)
320   if (!hasIOError) {
321     try {
322       seekToEnd();
323       // throw away any partial events
324       offset_ += readState_.lastDispatchPtr_;
325       if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
326         readState_.resetAllValues();
327       } else {
328         int errno_copy = THRIFT_ERRNO;
329         GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
330         hasIOError = true;
331       }
332     } catch (...) {
333       int errno_copy = THRIFT_ERRNO;
334       GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
335       hasIOError = true;
336     }
337   }
338 
339   // Figure out the next time by which a flush must take place
340   auto ts_next_flush = getNextFlushTime();
341   uint32_t unflushed = 0;
342 
343   while (1) {
344     // this will only be true when the destructor is being invoked
345     if (closing_) {
346       if (hasIOError) {
347         return;
348       }
349 
350       // Try to empty buffers before exit
351       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
352         ::THRIFT_FSYNC(fd_);
353         if (-1 == ::THRIFT_CLOSE(fd_)) {
354           int errno_copy = THRIFT_ERRNO;
355           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
356         } else {
357           // fd successfully closed
358           fd_ = 0;
359         }
360         return;
361       }
362     }
363 
364     if (swapEventBuffers(&ts_next_flush)) {
365       eventInfo* outEvent;
366       while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
367         // Remove an event from the buffer and write it out to disk. If there is any IO error, for
368         // instance,
369         // the output file is unmounted or deleted, then this event is dropped. However, the writer
370         // thread
371         // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
372         // start writing
373         // from the end.
374 
375         while (hasIOError) {
376           T_ERROR(
377               "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
378               writerThreadIOErrorSleepTime_);
379           THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
380           if (closing_) {
381             return;
382           }
383           if (!fd_) {
384             ::THRIFT_CLOSE(fd_);
385             fd_ = 0;
386           }
387           try {
388             openLogFile();
389             seekToEnd();
390             unflushed = 0;
391             hasIOError = false;
392             T_LOG_OPER(
393                 "TFileTransport: log file %s reopened by writer thread during error recovery",
394                 filename_.c_str());
395           } catch (...) {
396             T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
397                     filename_.c_str());
398           }
399         }
400 
401         // sanity check on event
402         if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
403           T_ERROR("msg size is greater than max event size: %u > %u\n",
404                   outEvent->eventSize_,
405                   maxEventSize_);
406           continue;
407         }
408 
409         // If chunking is required, then make sure that msg does not cross chunk boundary
410         if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
411           // event size must be less than chunk size
412           if (outEvent->eventSize_ > chunkSize_) {
413             T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
414                     outEvent->eventSize_,
415                     chunkSize_);
416             continue;
417           }
418 
419           int64_t chunk1 = offset_ / chunkSize_;
420           int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
421 
422           // if adding this event will cross a chunk boundary, pad the chunk with zeros
423           if (chunk1 != chunk2) {
424             // refetch the offset to keep in sync
425             offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
426             auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
427 
428             auto* zeros = new uint8_t[padding];
429             memset(zeros, '\0', padding);
430             boost::scoped_array<uint8_t> array(zeros);
431             if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
432               int errno_copy = THRIFT_ERRNO;
433               GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
434                                   errno_copy);
435               hasIOError = true;
436               continue;
437             }
438             unflushed += padding;
439             offset_ += padding;
440           }
441         }
442 
443         // write the dequeued event to the file
444         if (outEvent->eventSize_ > 0) {
445           if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
446             int errno_copy = THRIFT_ERRNO;
447             GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
448             hasIOError = true;
449             continue;
450           }
451           unflushed += outEvent->eventSize_;
452           offset_ += outEvent->eventSize_;
453         }
454       }
455       dequeueBuffer_->reset();
456     }
457 
458     if (hasIOError) {
459       continue;
460     }
461 
462     // Local variable to cache the state of forceFlush_.
463     //
464     // We only want to check the value of forceFlush_ once each time around the
465     // loop.  If we check it more than once without holding the lock the entire
466     // time, it could have changed state in between.  This will result in us
467     // making inconsistent decisions.
468     bool forced_flush = false;
469     {
470       Guard g(mutex_);
471       if (forceFlush_) {
472         if (!enqueueBuffer_->isEmpty()) {
473           // If forceFlush_ is true, we need to flush all available data.
474           // If enqueueBuffer_ is not empty, go back to the start of the loop to
475           // write it out.
476           //
477           // We know the main thread is waiting on forceFlush_ to be cleared,
478           // so no new events will be added to enqueueBuffer_ until we clear
479           // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
480           // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
481           // and clear forceFlush_ the next time around the loop.)
482           continue;
483         }
484         forced_flush = true;
485       }
486     }
487 
488     // determine if we need to perform an fsync
489     bool flush = false;
490     if (forced_flush || unflushed > flushMaxBytes_) {
491       flush = true;
492     } else {
493       if (std::chrono::steady_clock::now() > ts_next_flush) {
494         if (unflushed > 0) {
495           flush = true;
496         } else {
497           // If there is no new data since the last fsync,
498           // don't perform the fsync, but do reset the timer.
499           ts_next_flush = getNextFlushTime();
500         }
501       }
502     }
503 
504     if (flush) {
505       // sync (force flush) file to disk
506       THRIFT_FSYNC(fd_);
507       unflushed = 0;
508       ts_next_flush = getNextFlushTime();
509 
510       // notify anybody waiting for flush completion
511       if (forced_flush) {
512         Guard g(mutex_);
513         forceFlush_ = false;
514         assert(enqueueBuffer_->isEmpty());
515         assert(dequeueBuffer_->isEmpty());
516         flushed_.notifyAll();
517       }
518     }
519   }
520 }
521 
flush()522 void TFileTransport::flush() {
523   resetConsumedMessageSize();
524   // file must be open for writing for any flushing to take place
525   if (!writerThread_.get()) {
526     return;
527   }
528   // wait for flush to take place
529   Guard g(mutex_);
530 
531   // Indicate that we are requesting a flush
532   forceFlush_ = true;
533   // Wake up the writer thread so it will perform the flush immediately
534   notEmpty_.notify();
535 
536   while (forceFlush_) {
537     flushed_.wait();
538   }
539 }
540 
readAll(uint8_t * buf,uint32_t len)541 uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
542   checkReadBytesAvailable(len);
543   uint32_t have = 0;
544   uint32_t get = 0;
545 
546   while (have < len) {
547     get = read(buf + have, len - have);
548     if (get <= 0) {
549       throw TEOFException();
550     }
551     have += get;
552   }
553 
554   return have;
555 }
556 
peek()557 bool TFileTransport::peek() {
558   // check if there is an event ready to be read
559   if (!currentEvent_) {
560     currentEvent_ = readEvent();
561   }
562 
563   // did not manage to read an event from the file. This could have happened
564   // if the timeout expired or there was some other error
565   if (!currentEvent_) {
566     return false;
567   }
568 
569   // check if there is anything to read
570   return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
571 }
572 
read(uint8_t * buf,uint32_t len)573 uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
574   checkReadBytesAvailable(len);
575   // check if there an event is ready to be read
576   if (!currentEvent_) {
577     currentEvent_ = readEvent();
578   }
579 
580   // did not manage to read an event from the file. This could have happened
581   // if the timeout expired or there was some other error
582   if (!currentEvent_) {
583     return 0;
584   }
585 
586   // read as much of the current event as possible
587   int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
588   if (remaining <= (int32_t)len) {
589     // copy over anything thats remaining
590     if (remaining > 0) {
591       memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
592     }
593     delete (currentEvent_);
594     currentEvent_ = nullptr;
595     return remaining;
596   }
597 
598   // read as much as possible
599   memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
600   currentEvent_->eventBuffPos_ += len;
601   return len;
602 }
603 
604 // note caller is responsible for freeing returned events
readEvent()605 eventInfo* TFileTransport::readEvent() {
606   int readTries = 0;
607 
608   if (!readBuff_) {
609     readBuff_ = new uint8_t[readBuffSize_];
610   }
611 
612   while (1) {
613     // read from the file if read buffer is exhausted
614     if (readState_.bufferPtr_ == readState_.bufferLen_) {
615       // advance the offset pointer
616       offset_ += readState_.bufferLen_;
617       readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
618       //       if (readState_.bufferLen_) {
619       //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
620       //       }
621       readState_.bufferPtr_ = 0;
622       readState_.lastDispatchPtr_ = 0;
623 
624       // read error
625       if (readState_.bufferLen_ == -1) {
626         readState_.resetAllValues();
627         GlobalOutput("TFileTransport: error while reading from file");
628         throw TTransportException("TFileTransport: error while reading from file");
629       } else if (readState_.bufferLen_ == 0) { // EOF
630         // wait indefinitely if there is no timeout
631         if (readTimeout_ == TAIL_READ_TIMEOUT) {
632           THRIFT_SLEEP_USEC(eofSleepTime_);
633           continue;
634         } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
635           // reset state
636           readState_.resetState(0);
637           return nullptr;
638         } else if (readTimeout_ > 0) {
639           // timeout already expired once
640           if (readTries > 0) {
641             readState_.resetState(0);
642             return nullptr;
643           } else {
644             THRIFT_SLEEP_USEC(readTimeout_ * 1000);
645             readTries++;
646             continue;
647           }
648         }
649       }
650     }
651 
652     readTries = 0;
653 
654     // attempt to read an event from the buffer
655     while (readState_.bufferPtr_ < readState_.bufferLen_) {
656       if (readState_.readingSize_) {
657         if (readState_.eventSizeBuffPos_ == 0) {
658           if ((offset_ + readState_.bufferPtr_) / chunkSize_
659               != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
660             // skip one byte towards chunk boundary
661             //            T_DEBUG_L(1, "Skipping a byte");
662             readState_.bufferPtr_++;
663             continue;
664           }
665         }
666 
667         readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
668             = readBuff_[readState_.bufferPtr_++];
669 
670         if (readState_.eventSizeBuffPos_ == 4) {
671           if (readState_.getEventSize() == 0) {
672             // 0 length event indicates padding
673             //            T_DEBUG_L(1, "Got padding");
674             readState_.resetState(readState_.lastDispatchPtr_);
675             continue;
676           }
677           // got a valid event
678           readState_.readingSize_ = false;
679           if (readState_.event_) {
680             delete (readState_.event_);
681           }
682           readState_.event_ = new eventInfo();
683           readState_.event_->eventSize_ = readState_.getEventSize();
684 
685           // check if the event is corrupted and perform recovery if required
686           if (isEventCorrupted()) {
687             performRecovery();
688             // start from the top
689             break;
690           }
691         }
692       } else {
693         if (!readState_.event_->eventBuff_) {
694           readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
695           readState_.event_->eventBuffPos_ = 0;
696         }
697         // take either the entire event or the remaining bytes in the buffer
698         int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
699                                        readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
700 
701         // copy data from read buffer into event buffer
702         memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
703                readBuff_ + readState_.bufferPtr_,
704                reclaimBuffer);
705 
706         // increment position ptrs
707         readState_.event_->eventBuffPos_ += reclaimBuffer;
708         readState_.bufferPtr_ += reclaimBuffer;
709 
710         // check if the event has been read in full
711         if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
712           // set the completed event to the current event
713           eventInfo* completeEvent = readState_.event_;
714           completeEvent->eventBuffPos_ = 0;
715 
716           readState_.event_ = nullptr;
717           readState_.resetState(readState_.bufferPtr_);
718 
719           // exit criteria
720           return completeEvent;
721         }
722       }
723     }
724   }
725 }
726 
isEventCorrupted()727 bool TFileTransport::isEventCorrupted() {
728   // an error is triggered if:
729   if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
730     // 1. Event size is larger than user-speficied max-event size
731     T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
732             readState_.event_->eventSize_,
733             maxEventSize_);
734     return true;
735   } else if (readState_.event_->eventSize_ > chunkSize_) {
736     // 2. Event size is larger than chunk size
737     T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
738             readState_.event_->eventSize_,
739             chunkSize_);
740     return true;
741   } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
742              != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
743                  / chunkSize_)) {
744     // 3. size indicates that event crosses chunk boundary
745     T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%lu",
746             readState_.event_->eventSize_,
747             static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
748 
749     return true;
750   }
751 
752   return false;
753 }
754 
performRecovery()755 void TFileTransport::performRecovery() {
756   // perform some kickass recovery
757   uint32_t curChunk = getCurChunk();
758   if (lastBadChunk_ == curChunk) {
759     numCorruptedEventsInChunk_++;
760   } else {
761     lastBadChunk_ = curChunk;
762     numCorruptedEventsInChunk_ = 1;
763   }
764 
765   if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
766     // maybe there was an error in reading the file from disk
767     // seek to the beginning of chunk and try again
768     seekToChunk(curChunk);
769   } else {
770 
771     // just skip ahead to the next chunk if we not already at the last chunk
772     if (curChunk != (getNumChunks() - 1)) {
773       seekToChunk(curChunk + 1);
774     } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
775       // if tailing the file, wait until there is enough data to start
776       // the next chunk
777       while (curChunk == (getNumChunks() - 1)) {
778         THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
779       }
780       seekToChunk(curChunk + 1);
781     } else {
782       // pretty hosed at this stage, rewind the file back to the last successful
783       // point and punt on the error
784       readState_.resetState(readState_.lastDispatchPtr_);
785       currentEvent_ = nullptr;
786       char errorMsg[1024];
787       sprintf(errorMsg,
788               "TFileTransport: log file corrupted at offset: %lu",
789               static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
790 
791       GlobalOutput(errorMsg);
792       throw TTransportException(errorMsg);
793     }
794   }
795 }
796 
seekToChunk(int32_t chunk)797 void TFileTransport::seekToChunk(int32_t chunk) {
798   if (fd_ <= 0) {
799     throw TTransportException("File not open");
800   }
801 
802   int32_t numChunks = getNumChunks();
803 
804   // file is empty, seeking to chunk is pointless
805   if (numChunks == 0) {
806     return;
807   }
808 
809   // negative indicates reverse seek (from the end)
810   if (chunk < 0) {
811     chunk += numChunks;
812   }
813 
814   // too large a value for reverse seek, just seek to beginning
815   if (chunk < 0) {
816     T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
817     chunk = 0;
818   }
819 
820   // cannot seek past EOF
821   bool seekToEnd = false;
822   off_t minEndOffset = 0;
823   if (chunk >= numChunks) {
824     T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
825     seekToEnd = true;
826     chunk = numChunks - 1;
827     // this is the min offset to process events till
828     minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
829   }
830 
831   off_t newOffset = off_t(chunk) * chunkSize_;
832   offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
833   readState_.resetAllValues();
834   currentEvent_ = nullptr;
835   if (offset_ == -1) {
836     GlobalOutput("TFileTransport: lseek error in seekToChunk");
837     throw TTransportException("TFileTransport: lseek error in seekToChunk");
838   }
839 
840   // seek to EOF if user wanted to go to last chunk
841   if (seekToEnd) {
842     uint32_t oldReadTimeout = getReadTimeout();
843     setReadTimeout(NO_TAIL_READ_TIMEOUT);
844     // keep on reading unti the last event at point of seekChunk call
845     shared_ptr<eventInfo> event;
846     while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
847       event.reset(readEvent());
848       if (event.get() == nullptr) {
849         break;
850       }
851     }
852     setReadTimeout(oldReadTimeout);
853   }
854 }
855 
seekToEnd()856 void TFileTransport::seekToEnd() {
857   seekToChunk(getNumChunks());
858 }
859 
getNumChunks()860 uint32_t TFileTransport::getNumChunks() {
861   if (fd_ <= 0) {
862     return 0;
863   }
864 
865   struct THRIFT_STAT f_info;
866   int rv = ::THRIFT_FSTAT(fd_, &f_info);
867 
868   if (rv < 0) {
869     int errno_copy = THRIFT_ERRNO;
870     throw TTransportException(TTransportException::UNKNOWN,
871                               "TFileTransport::getNumChunks() (fstat)",
872                               errno_copy);
873   }
874 
875   if (f_info.st_size > 0) {
876     size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
877     if (numChunks > (std::numeric_limits<uint32_t>::max)())
878       throw TTransportException("Too many chunks");
879     return static_cast<uint32_t>(numChunks);
880   }
881 
882   // empty file has no chunks
883   return 0;
884 }
885 
getCurChunk()886 uint32_t TFileTransport::getCurChunk() {
887   return static_cast<uint32_t>(offset_ / chunkSize_);
888 }
889 
890 // Utility Functions
openLogFile()891 void TFileTransport::openLogFile() {
892 #ifndef _WIN32
893   mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
894   int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
895 #else
896   int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
897   int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
898 #endif
899   fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
900   offset_ = 0;
901 
902   // make sure open call was successful
903   if (fd_ == -1) {
904     int errno_copy = THRIFT_ERRNO;
905     GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
906     throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
907   }
908 }
909 
getNextFlushTime()910 std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
911   return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
912 }
913 
TFileTransportBuffer(uint32_t size)914 TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
915   : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
916   buffer_ = new eventInfo* [size];
917 }
918 
~TFileTransportBuffer()919 TFileTransportBuffer::~TFileTransportBuffer() {
920   if (buffer_) {
921     for (uint32_t i = 0; i < writePoint_; i++) {
922       delete buffer_[i];
923     }
924     delete[] buffer_;
925     buffer_ = nullptr;
926   }
927 }
928 
addEvent(eventInfo * event)929 bool TFileTransportBuffer::addEvent(eventInfo* event) {
930   if (bufferMode_ == READ) {
931     GlobalOutput("Trying to write to a buffer in read mode");
932   }
933   if (writePoint_ < size_) {
934     buffer_[writePoint_++] = event;
935     return true;
936   } else {
937     // buffer is full
938     return false;
939   }
940 }
941 
getNext()942 eventInfo* TFileTransportBuffer::getNext() {
943   if (bufferMode_ == WRITE) {
944     bufferMode_ = READ;
945   }
946   if (readPoint_ < writePoint_) {
947     return buffer_[readPoint_++];
948   } else {
949     // no more entries
950     return nullptr;
951   }
952 }
953 
reset()954 void TFileTransportBuffer::reset() {
955   if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
956     T_DEBUG("%s", "Resetting a buffer with unread entries");
957   }
958   // Clean up the old entries
959   for (uint32_t i = 0; i < writePoint_; i++) {
960     delete buffer_[i];
961   }
962   bufferMode_ = WRITE;
963   writePoint_ = 0;
964   readPoint_ = 0;
965 }
966 
isFull()967 bool TFileTransportBuffer::isFull() {
968   return writePoint_ == size_;
969 }
970 
isEmpty()971 bool TFileTransportBuffer::isEmpty() {
972   return writePoint_ == 0;
973 }
974 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport)975 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
976                                shared_ptr<TProtocolFactory> protocolFactory,
977                                shared_ptr<TFileReaderTransport> inputTransport)
978   : processor_(processor),
979     inputProtocolFactory_(protocolFactory),
980     outputProtocolFactory_(protocolFactory),
981     inputTransport_(inputTransport) {
982 
983   // default the output transport to a null transport (common case)
984   outputTransport_ = std::make_shared<TNullTransport>();
985 }
986 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> inputProtocolFactory,shared_ptr<TProtocolFactory> outputProtocolFactory,shared_ptr<TFileReaderTransport> inputTransport)987 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
988                                shared_ptr<TProtocolFactory> inputProtocolFactory,
989                                shared_ptr<TProtocolFactory> outputProtocolFactory,
990                                shared_ptr<TFileReaderTransport> inputTransport)
991   : processor_(processor),
992     inputProtocolFactory_(inputProtocolFactory),
993     outputProtocolFactory_(outputProtocolFactory),
994     inputTransport_(inputTransport) {
995 
996   // default the output transport to a null transport (common case)
997   outputTransport_ = std::make_shared<TNullTransport>();
998 }
999 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport,shared_ptr<TTransport> outputTransport)1000 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
1001                                shared_ptr<TProtocolFactory> protocolFactory,
1002                                shared_ptr<TFileReaderTransport> inputTransport,
1003                                shared_ptr<TTransport> outputTransport)
1004   : processor_(processor),
1005     inputProtocolFactory_(protocolFactory),
1006     outputProtocolFactory_(protocolFactory),
1007     inputTransport_(inputTransport),
1008     outputTransport_(outputTransport) {
1009 }
1010 
process(uint32_t numEvents,bool tail)1011 void TFileProcessor::process(uint32_t numEvents, bool tail) {
1012   shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1013   shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1014 
1015   // set the read timeout to 0 if tailing is required
1016   int32_t oldReadTimeout = inputTransport_->getReadTimeout();
1017   if (tail) {
1018     // save old read timeout so it can be restored
1019     inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
1020   }
1021 
1022   uint32_t numProcessed = 0;
1023   while (1) {
1024     // bad form to use exceptions for flow control but there is really
1025     // no other way around it
1026     try {
1027       processor_->process(inputProtocol, outputProtocol, nullptr);
1028       numProcessed++;
1029       if ((numEvents > 0) && (numProcessed == numEvents)) {
1030         return;
1031       }
1032     } catch (TEOFException&) {
1033       if (!tail) {
1034         break;
1035       }
1036     } catch (TException& te) {
1037       cerr << te.what() << endl;
1038       break;
1039     }
1040   }
1041 
1042   // restore old read timeout
1043   if (tail) {
1044     inputTransport_->setReadTimeout(oldReadTimeout);
1045   }
1046 }
1047 
processChunk()1048 void TFileProcessor::processChunk() {
1049   shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1050   shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1051 
1052   uint32_t curChunk = inputTransport_->getCurChunk();
1053 
1054   while (1) {
1055     // bad form to use exceptions for flow control but there is really
1056     // no other way around it
1057     try {
1058       processor_->process(inputProtocol, outputProtocol, nullptr);
1059       if (curChunk != inputTransport_->getCurChunk()) {
1060         break;
1061       }
1062     } catch (TEOFException&) {
1063       break;
1064     } catch (TException& te) {
1065       cerr << te.what() << endl;
1066       break;
1067     }
1068   }
1069 }
1070 }
1071 }
1072 } // apache::thrift::transport
1073