1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <thrift/transport/TFileTransport.h>
23 #include <thrift/transport/TTransportUtils.h>
24 #include <thrift/transport/PlatformSocket.h>
25 #include <thrift/concurrency/FunctionRunner.h>
26 
27 #include <boost/version.hpp>
28 
29 #ifdef HAVE_SYS_TIME_H
30 #include <sys/time.h>
31 #else
32 #include <time.h>
33 #endif
34 #include <fcntl.h>
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_STRINGS_H
39 #include <strings.h>
40 #endif
41 #include <cstdlib>
42 #include <cstring>
43 #include <iostream>
44 #include <limits>
45 #include <memory>
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49 
50 #ifdef _WIN32
51 #include <io.h>
52 #endif
53 
54 namespace apache {
55 namespace thrift {
56 namespace transport {
57 
58 using std::shared_ptr;
59 using std::cerr;
60 using std::cout;
61 using std::endl;
62 using std::string;
63 using namespace apache::thrift::protocol;
64 using namespace apache::thrift::concurrency;
65 
TFileTransport(string path,bool readOnly)66 TFileTransport::TFileTransport(string path, bool readOnly)
67   : readState_(),
68     readBuff_(nullptr),
69     currentEvent_(nullptr),
70     readBuffSize_(DEFAULT_READ_BUFF_SIZE),
71     readTimeout_(NO_TAIL_READ_TIMEOUT),
72     chunkSize_(DEFAULT_CHUNK_SIZE),
73     eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
74     flushMaxUs_(DEFAULT_FLUSH_MAX_US),
75     flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
76     maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
77     maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
78     eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
79     corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
80     writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
81     dequeueBuffer_(nullptr),
82     enqueueBuffer_(nullptr),
83     notFull_(&mutex_),
84     notEmpty_(&mutex_),
85     closing_(false),
86     flushed_(&mutex_),
87     forceFlush_(false),
88     filename_(path),
89     fd_(0),
90     bufferAndThreadInitialized_(false),
91     offset_(0),
92     lastBadChunk_(0),
93     numCorruptedEventsInChunk_(0),
94     readOnly_(readOnly) {
95   threadFactory_.setDetached(false);
96   openLogFile();
97 }
98 
resetOutputFile(int fd,string filename,off_t offset)99 void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
100   filename_ = filename;
101   offset_ = offset;
102 
103   // check if current file is still open
104   if (fd_ > 0) {
105     // flush any events in the queue
106     flush();
107     GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
108     if (-1 == ::THRIFT_CLOSE(fd_)) {
109       int errno_copy = THRIFT_ERRNO;
110       GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
111       throw TTransportException(TTransportException::UNKNOWN,
112                                 "TFileTransport: error in file close",
113                                 errno_copy);
114     } else {
115       // successfully closed fd
116       fd_ = 0;
117     }
118   }
119 
120   if (fd) {
121     fd_ = fd;
122   } else {
123     // open file if the input fd is 0
124     openLogFile();
125   }
126 }
127 
~TFileTransport()128 TFileTransport::~TFileTransport() {
129   // flush the buffer if a writer thread is active
130   if (writerThread_.get()) {
131     // set state to closing
132     closing_ = true;
133 
134     // wake up the writer thread
135     // Since closing_ is true, it will attempt to flush all data, then exit.
136     notEmpty_.notify();
137 
138     writerThread_->join();
139     writerThread_.reset();
140   }
141 
142   if (dequeueBuffer_) {
143     delete dequeueBuffer_;
144     dequeueBuffer_ = nullptr;
145   }
146 
147   if (enqueueBuffer_) {
148     delete enqueueBuffer_;
149     enqueueBuffer_ = nullptr;
150   }
151 
152   if (readBuff_) {
153     delete[] readBuff_;
154     readBuff_ = nullptr;
155   }
156 
157   if (currentEvent_) {
158     delete currentEvent_;
159     currentEvent_ = nullptr;
160   }
161 
162   // close logfile
163   if (fd_ > 0) {
164     if (-1 == ::THRIFT_CLOSE(fd_)) {
165       GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
166     } else {
167       // successfully closed fd
168       fd_ = 0;
169     }
170   }
171 }
172 
initBufferAndWriteThread()173 bool TFileTransport::initBufferAndWriteThread() {
174   if (bufferAndThreadInitialized_) {
175     T_ERROR("%s", "Trying to double-init TFileTransport");
176     return false;
177   }
178 
179   if (!writerThread_.get()) {
180     writerThread_ = threadFactory_.newThread(
181         apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
182     writerThread_->start();
183   }
184 
185   dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
186   enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
187   bufferAndThreadInitialized_ = true;
188 
189   return true;
190 }
191 
write(const uint8_t * buf,uint32_t len)192 void TFileTransport::write(const uint8_t* buf, uint32_t len) {
193   if (readOnly_) {
194     throw TTransportException("TFileTransport: attempting to write to file opened readonly");
195   }
196 
197   enqueueEvent(buf, len);
198 }
199 
200 template <class _T>
201 struct uniqueDeleter
202 {
operator ()apache::thrift::transport::uniqueDeleter203   void operator()(_T *ptr) const { delete ptr; }
204 };
205 
enqueueEvent(const uint8_t * buf,uint32_t eventLen)206 void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
207   // can't enqueue more events if file is going to close
208   if (closing_) {
209     return;
210   }
211 
212   // make sure that event size is valid
213   if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
214     T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
215     return;
216   }
217 
218   if (eventLen == 0) {
219     T_ERROR("%s", "cannot enqueue an empty event");
220     return;
221   }
222 
223   std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo());
224   toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
225 
226   // first 4 bytes is the event length
227   memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
228   // actual event contents
229   memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
230   toEnqueue->eventSize_ = eventLen + 4;
231 
232   // lock mutex
233   Guard g(mutex_);
234 
235   // make sure that enqueue buffer is initialized and writer thread is running
236   if (!bufferAndThreadInitialized_) {
237     if (!initBufferAndWriteThread()) {
238       return;
239     }
240   }
241 
242   // Can't enqueue while buffer is full
243   while (enqueueBuffer_->isFull()) {
244     notFull_.wait();
245   }
246 
247   // We shouldn't be trying to enqueue new data while a forced flush is
248   // requested.  (Otherwise the writer thread might not ever be able to finish
249   // the flush if more data keeps being enqueued.)
250   assert(!forceFlush_);
251 
252   // add to the buffer
253   eventInfo* pEvent = toEnqueue.release();
254   if (!enqueueBuffer_->addEvent(pEvent)) {
255     delete pEvent;
256     return;
257   }
258 
259   // signal anybody who's waiting for the buffer to be non-empty
260   notEmpty_.notify();
261 
262   // this really should be a loop where it makes sure it got flushed
263   // because condition variables can get triggered by the os for no reason
264   // it is probably a non-factor for the time being
265 }
266 
swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> * deadline)267 bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
268   bool swap;
269   Guard g(mutex_);
270 
271   if (!enqueueBuffer_->isEmpty()) {
272     swap = true;
273   } else if (closing_) {
274     // even though there is no data to write,
275     // return immediately if the transport is closing
276     swap = false;
277   } else {
278     if (deadline != nullptr) {
279       // if we were handed a deadline time struct, do a timed wait
280       notEmpty_.waitForTime(*deadline);
281     } else {
282       // just wait until the buffer gets an item
283       notEmpty_.wait();
284     }
285 
286     // could be empty if we timed out
287     swap = enqueueBuffer_->isEmpty();
288   }
289 
290   if (swap) {
291     TFileTransportBuffer* temp = enqueueBuffer_;
292     enqueueBuffer_ = dequeueBuffer_;
293     dequeueBuffer_ = temp;
294   }
295 
296   if (swap) {
297     notFull_.notify();
298   }
299 
300   return swap;
301 }
302 
writerThread()303 void TFileTransport::writerThread() {
304   bool hasIOError = false;
305 
306   // open file if it is not open
307   if (!fd_) {
308     try {
309       openLogFile();
310     } catch (...) {
311       int errno_copy = THRIFT_ERRNO;
312       GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
313       fd_ = 0;
314       hasIOError = true;
315     }
316   }
317 
318   // set the offset to the correct value (EOF)
319   if (!hasIOError) {
320     try {
321       seekToEnd();
322       // throw away any partial events
323       offset_ += readState_.lastDispatchPtr_;
324       if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
325         readState_.resetAllValues();
326       } else {
327         int errno_copy = THRIFT_ERRNO;
328         GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
329         hasIOError = true;
330       }
331     } catch (...) {
332       int errno_copy = THRIFT_ERRNO;
333       GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
334       hasIOError = true;
335     }
336   }
337 
338   // Figure out the next time by which a flush must take place
339   auto ts_next_flush = getNextFlushTime();
340   uint32_t unflushed = 0;
341 
342   while (1) {
343     // this will only be true when the destructor is being invoked
344     if (closing_) {
345       if (hasIOError) {
346         return;
347       }
348 
349       // Try to empty buffers before exit
350       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
351         ::THRIFT_FSYNC(fd_);
352         if (-1 == ::THRIFT_CLOSE(fd_)) {
353           int errno_copy = THRIFT_ERRNO;
354           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
355         } else {
356           // fd successfully closed
357           fd_ = 0;
358         }
359         return;
360       }
361     }
362 
363     if (swapEventBuffers(&ts_next_flush)) {
364       eventInfo* outEvent;
365       while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
366         // Remove an event from the buffer and write it out to disk. If there is any IO error, for
367         // instance,
368         // the output file is unmounted or deleted, then this event is dropped. However, the writer
369         // thread
370         // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
371         // start writing
372         // from the end.
373 
374         while (hasIOError) {
375           T_ERROR(
376               "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
377               writerThreadIOErrorSleepTime_);
378           THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
379           if (closing_) {
380             return;
381           }
382           if (!fd_) {
383             ::THRIFT_CLOSE(fd_);
384             fd_ = 0;
385           }
386           try {
387             openLogFile();
388             seekToEnd();
389             unflushed = 0;
390             hasIOError = false;
391             T_LOG_OPER(
392                 "TFileTransport: log file %s reopened by writer thread during error recovery",
393                 filename_.c_str());
394           } catch (...) {
395             T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
396                     filename_.c_str());
397           }
398         }
399 
400         // sanity check on event
401         if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
402           T_ERROR("msg size is greater than max event size: %u > %u\n",
403                   outEvent->eventSize_,
404                   maxEventSize_);
405           continue;
406         }
407 
408         // If chunking is required, then make sure that msg does not cross chunk boundary
409         if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
410           // event size must be less than chunk size
411           if (outEvent->eventSize_ > chunkSize_) {
412             T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
413                     outEvent->eventSize_,
414                     chunkSize_);
415             continue;
416           }
417 
418           int64_t chunk1 = offset_ / chunkSize_;
419           int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
420 
421           // if adding this event will cross a chunk boundary, pad the chunk with zeros
422           if (chunk1 != chunk2) {
423             // refetch the offset to keep in sync
424             offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
425             auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
426 
427             auto* zeros = new uint8_t[padding];
428             memset(zeros, '\0', padding);
429             boost::scoped_array<uint8_t> array(zeros);
430             if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
431               int errno_copy = THRIFT_ERRNO;
432               GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
433                                   errno_copy);
434               hasIOError = true;
435               continue;
436             }
437             unflushed += padding;
438             offset_ += padding;
439           }
440         }
441 
442         // write the dequeued event to the file
443         if (outEvent->eventSize_ > 0) {
444           if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
445             int errno_copy = THRIFT_ERRNO;
446             GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
447             hasIOError = true;
448             continue;
449           }
450           unflushed += outEvent->eventSize_;
451           offset_ += outEvent->eventSize_;
452         }
453       }
454       dequeueBuffer_->reset();
455     }
456 
457     if (hasIOError) {
458       continue;
459     }
460 
461     // Local variable to cache the state of forceFlush_.
462     //
463     // We only want to check the value of forceFlush_ once each time around the
464     // loop.  If we check it more than once without holding the lock the entire
465     // time, it could have changed state in between.  This will result in us
466     // making inconsistent decisions.
467     bool forced_flush = false;
468     {
469       Guard g(mutex_);
470       if (forceFlush_) {
471         if (!enqueueBuffer_->isEmpty()) {
472           // If forceFlush_ is true, we need to flush all available data.
473           // If enqueueBuffer_ is not empty, go back to the start of the loop to
474           // write it out.
475           //
476           // We know the main thread is waiting on forceFlush_ to be cleared,
477           // so no new events will be added to enqueueBuffer_ until we clear
478           // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
479           // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
480           // and clear forceFlush_ the next time around the loop.)
481           continue;
482         }
483         forced_flush = true;
484       }
485     }
486 
487     // determine if we need to perform an fsync
488     bool flush = false;
489     if (forced_flush || unflushed > flushMaxBytes_) {
490       flush = true;
491     } else {
492       if (std::chrono::steady_clock::now() > ts_next_flush) {
493         if (unflushed > 0) {
494           flush = true;
495         } else {
496           // If there is no new data since the last fsync,
497           // don't perform the fsync, but do reset the timer.
498           ts_next_flush = getNextFlushTime();
499         }
500       }
501     }
502 
503     if (flush) {
504       // sync (force flush) file to disk
505       THRIFT_FSYNC(fd_);
506       unflushed = 0;
507       ts_next_flush = getNextFlushTime();
508 
509       // notify anybody waiting for flush completion
510       if (forced_flush) {
511         Guard g(mutex_);
512         forceFlush_ = false;
513         assert(enqueueBuffer_->isEmpty());
514         assert(dequeueBuffer_->isEmpty());
515         flushed_.notifyAll();
516       }
517     }
518   }
519 }
520 
flush()521 void TFileTransport::flush() {
522   // file must be open for writing for any flushing to take place
523   if (!writerThread_.get()) {
524     return;
525   }
526   // wait for flush to take place
527   Guard g(mutex_);
528 
529   // Indicate that we are requesting a flush
530   forceFlush_ = true;
531   // Wake up the writer thread so it will perform the flush immediately
532   notEmpty_.notify();
533 
534   while (forceFlush_) {
535     flushed_.wait();
536   }
537 }
538 
readAll(uint8_t * buf,uint32_t len)539 uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
540   uint32_t have = 0;
541   uint32_t get = 0;
542 
543   while (have < len) {
544     get = read(buf + have, len - have);
545     if (get <= 0) {
546       throw TEOFException();
547     }
548     have += get;
549   }
550 
551   return have;
552 }
553 
peek()554 bool TFileTransport::peek() {
555   // check if there is an event ready to be read
556   if (!currentEvent_) {
557     currentEvent_ = readEvent();
558   }
559 
560   // did not manage to read an event from the file. This could have happened
561   // if the timeout expired or there was some other error
562   if (!currentEvent_) {
563     return false;
564   }
565 
566   // check if there is anything to read
567   return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
568 }
569 
read(uint8_t * buf,uint32_t len)570 uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
571   // check if there an event is ready to be read
572   if (!currentEvent_) {
573     currentEvent_ = readEvent();
574   }
575 
576   // did not manage to read an event from the file. This could have happened
577   // if the timeout expired or there was some other error
578   if (!currentEvent_) {
579     return 0;
580   }
581 
582   // read as much of the current event as possible
583   int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
584   if (remaining <= (int32_t)len) {
585     // copy over anything thats remaining
586     if (remaining > 0) {
587       memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
588     }
589     delete (currentEvent_);
590     currentEvent_ = nullptr;
591     return remaining;
592   }
593 
594   // read as much as possible
595   memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
596   currentEvent_->eventBuffPos_ += len;
597   return len;
598 }
599 
600 // note caller is responsible for freeing returned events
readEvent()601 eventInfo* TFileTransport::readEvent() {
602   int readTries = 0;
603 
604   if (!readBuff_) {
605     readBuff_ = new uint8_t[readBuffSize_];
606   }
607 
608   while (1) {
609     // read from the file if read buffer is exhausted
610     if (readState_.bufferPtr_ == readState_.bufferLen_) {
611       // advance the offset pointer
612       offset_ += readState_.bufferLen_;
613       readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
614       //       if (readState_.bufferLen_) {
615       //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
616       //       }
617       readState_.bufferPtr_ = 0;
618       readState_.lastDispatchPtr_ = 0;
619 
620       // read error
621       if (readState_.bufferLen_ == -1) {
622         readState_.resetAllValues();
623         GlobalOutput("TFileTransport: error while reading from file");
624         throw TTransportException("TFileTransport: error while reading from file");
625       } else if (readState_.bufferLen_ == 0) { // EOF
626         // wait indefinitely if there is no timeout
627         if (readTimeout_ == TAIL_READ_TIMEOUT) {
628           THRIFT_SLEEP_USEC(eofSleepTime_);
629           continue;
630         } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
631           // reset state
632           readState_.resetState(0);
633           return nullptr;
634         } else if (readTimeout_ > 0) {
635           // timeout already expired once
636           if (readTries > 0) {
637             readState_.resetState(0);
638             return nullptr;
639           } else {
640             THRIFT_SLEEP_USEC(readTimeout_ * 1000);
641             readTries++;
642             continue;
643           }
644         }
645       }
646     }
647 
648     readTries = 0;
649 
650     // attempt to read an event from the buffer
651     while (readState_.bufferPtr_ < readState_.bufferLen_) {
652       if (readState_.readingSize_) {
653         if (readState_.eventSizeBuffPos_ == 0) {
654           if ((offset_ + readState_.bufferPtr_) / chunkSize_
655               != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
656             // skip one byte towards chunk boundary
657             //            T_DEBUG_L(1, "Skipping a byte");
658             readState_.bufferPtr_++;
659             continue;
660           }
661         }
662 
663         readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
664             = readBuff_[readState_.bufferPtr_++];
665 
666         if (readState_.eventSizeBuffPos_ == 4) {
667           if (readState_.getEventSize() == 0) {
668             // 0 length event indicates padding
669             //            T_DEBUG_L(1, "Got padding");
670             readState_.resetState(readState_.lastDispatchPtr_);
671             continue;
672           }
673           // got a valid event
674           readState_.readingSize_ = false;
675           if (readState_.event_) {
676             delete (readState_.event_);
677           }
678           readState_.event_ = new eventInfo();
679           readState_.event_->eventSize_ = readState_.getEventSize();
680 
681           // check if the event is corrupted and perform recovery if required
682           if (isEventCorrupted()) {
683             performRecovery();
684             // start from the top
685             break;
686           }
687         }
688       } else {
689         if (!readState_.event_->eventBuff_) {
690           readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
691           readState_.event_->eventBuffPos_ = 0;
692         }
693         // take either the entire event or the remaining bytes in the buffer
694         int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
695                                        readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
696 
697         // copy data from read buffer into event buffer
698         memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
699                readBuff_ + readState_.bufferPtr_,
700                reclaimBuffer);
701 
702         // increment position ptrs
703         readState_.event_->eventBuffPos_ += reclaimBuffer;
704         readState_.bufferPtr_ += reclaimBuffer;
705 
706         // check if the event has been read in full
707         if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
708           // set the completed event to the current event
709           eventInfo* completeEvent = readState_.event_;
710           completeEvent->eventBuffPos_ = 0;
711 
712           readState_.event_ = nullptr;
713           readState_.resetState(readState_.bufferPtr_);
714 
715           // exit criteria
716           return completeEvent;
717         }
718       }
719     }
720   }
721 }
722 
isEventCorrupted()723 bool TFileTransport::isEventCorrupted() {
724   // an error is triggered if:
725   if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
726     // 1. Event size is larger than user-speficied max-event size
727     T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
728             readState_.event_->eventSize_,
729             maxEventSize_);
730     return true;
731   } else if (readState_.event_->eventSize_ > chunkSize_) {
732     // 2. Event size is larger than chunk size
733     T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
734             readState_.event_->eventSize_,
735             chunkSize_);
736     return true;
737   } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
738              != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
739                  / chunkSize_)) {
740     // 3. size indicates that event crosses chunk boundary
741     T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%lu",
742             readState_.event_->eventSize_,
743             static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
744 
745     return true;
746   }
747 
748   return false;
749 }
750 
performRecovery()751 void TFileTransport::performRecovery() {
752   // perform some kickass recovery
753   uint32_t curChunk = getCurChunk();
754   if (lastBadChunk_ == curChunk) {
755     numCorruptedEventsInChunk_++;
756   } else {
757     lastBadChunk_ = curChunk;
758     numCorruptedEventsInChunk_ = 1;
759   }
760 
761   if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
762     // maybe there was an error in reading the file from disk
763     // seek to the beginning of chunk and try again
764     seekToChunk(curChunk);
765   } else {
766 
767     // just skip ahead to the next chunk if we not already at the last chunk
768     if (curChunk != (getNumChunks() - 1)) {
769       seekToChunk(curChunk + 1);
770     } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
771       // if tailing the file, wait until there is enough data to start
772       // the next chunk
773       while (curChunk == (getNumChunks() - 1)) {
774         THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
775       }
776       seekToChunk(curChunk + 1);
777     } else {
778       // pretty hosed at this stage, rewind the file back to the last successful
779       // point and punt on the error
780       readState_.resetState(readState_.lastDispatchPtr_);
781       currentEvent_ = nullptr;
782       char errorMsg[1024];
783       sprintf(errorMsg,
784               "TFileTransport: log file corrupted at offset: %lu",
785               static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
786 
787       GlobalOutput(errorMsg);
788       throw TTransportException(errorMsg);
789     }
790   }
791 }
792 
seekToChunk(int32_t chunk)793 void TFileTransport::seekToChunk(int32_t chunk) {
794   if (fd_ <= 0) {
795     throw TTransportException("File not open");
796   }
797 
798   int32_t numChunks = getNumChunks();
799 
800   // file is empty, seeking to chunk is pointless
801   if (numChunks == 0) {
802     return;
803   }
804 
805   // negative indicates reverse seek (from the end)
806   if (chunk < 0) {
807     chunk += numChunks;
808   }
809 
810   // too large a value for reverse seek, just seek to beginning
811   if (chunk < 0) {
812     T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
813     chunk = 0;
814   }
815 
816   // cannot seek past EOF
817   bool seekToEnd = false;
818   off_t minEndOffset = 0;
819   if (chunk >= numChunks) {
820     T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
821     seekToEnd = true;
822     chunk = numChunks - 1;
823     // this is the min offset to process events till
824     minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
825   }
826 
827   off_t newOffset = off_t(chunk) * chunkSize_;
828   offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
829   readState_.resetAllValues();
830   currentEvent_ = nullptr;
831   if (offset_ == -1) {
832     GlobalOutput("TFileTransport: lseek error in seekToChunk");
833     throw TTransportException("TFileTransport: lseek error in seekToChunk");
834   }
835 
836   // seek to EOF if user wanted to go to last chunk
837   if (seekToEnd) {
838     uint32_t oldReadTimeout = getReadTimeout();
839     setReadTimeout(NO_TAIL_READ_TIMEOUT);
840     // keep on reading unti the last event at point of seekChunk call
841     shared_ptr<eventInfo> event;
842     while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
843       event.reset(readEvent());
844       if (event.get() == nullptr) {
845         break;
846       }
847     }
848     setReadTimeout(oldReadTimeout);
849   }
850 }
851 
seekToEnd()852 void TFileTransport::seekToEnd() {
853   seekToChunk(getNumChunks());
854 }
855 
getNumChunks()856 uint32_t TFileTransport::getNumChunks() {
857   if (fd_ <= 0) {
858     return 0;
859   }
860 
861   struct THRIFT_STAT f_info;
862   int rv = ::THRIFT_FSTAT(fd_, &f_info);
863 
864   if (rv < 0) {
865     int errno_copy = THRIFT_ERRNO;
866     throw TTransportException(TTransportException::UNKNOWN,
867                               "TFileTransport::getNumChunks() (fstat)",
868                               errno_copy);
869   }
870 
871   if (f_info.st_size > 0) {
872     size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
873     if (numChunks > (std::numeric_limits<uint32_t>::max)())
874       throw TTransportException("Too many chunks");
875     return static_cast<uint32_t>(numChunks);
876   }
877 
878   // empty file has no chunks
879   return 0;
880 }
881 
getCurChunk()882 uint32_t TFileTransport::getCurChunk() {
883   return static_cast<uint32_t>(offset_ / chunkSize_);
884 }
885 
886 // Utility Functions
openLogFile()887 void TFileTransport::openLogFile() {
888 #ifndef _WIN32
889   mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
890   int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
891 #else
892   int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
893   int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
894 #endif
895   fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
896   offset_ = 0;
897 
898   // make sure open call was successful
899   if (fd_ == -1) {
900     int errno_copy = THRIFT_ERRNO;
901     GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
902     throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
903   }
904 }
905 
getNextFlushTime()906 std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
907   return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
908 }
909 
TFileTransportBuffer(uint32_t size)910 TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
911   : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
912   buffer_ = new eventInfo* [size];
913 }
914 
~TFileTransportBuffer()915 TFileTransportBuffer::~TFileTransportBuffer() {
916   if (buffer_) {
917     for (uint32_t i = 0; i < writePoint_; i++) {
918       delete buffer_[i];
919     }
920     delete[] buffer_;
921     buffer_ = nullptr;
922   }
923 }
924 
addEvent(eventInfo * event)925 bool TFileTransportBuffer::addEvent(eventInfo* event) {
926   if (bufferMode_ == READ) {
927     GlobalOutput("Trying to write to a buffer in read mode");
928   }
929   if (writePoint_ < size_) {
930     buffer_[writePoint_++] = event;
931     return true;
932   } else {
933     // buffer is full
934     return false;
935   }
936 }
937 
getNext()938 eventInfo* TFileTransportBuffer::getNext() {
939   if (bufferMode_ == WRITE) {
940     bufferMode_ = READ;
941   }
942   if (readPoint_ < writePoint_) {
943     return buffer_[readPoint_++];
944   } else {
945     // no more entries
946     return nullptr;
947   }
948 }
949 
reset()950 void TFileTransportBuffer::reset() {
951   if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
952     T_DEBUG("%s", "Resetting a buffer with unread entries");
953   }
954   // Clean up the old entries
955   for (uint32_t i = 0; i < writePoint_; i++) {
956     delete buffer_[i];
957   }
958   bufferMode_ = WRITE;
959   writePoint_ = 0;
960   readPoint_ = 0;
961 }
962 
isFull()963 bool TFileTransportBuffer::isFull() {
964   return writePoint_ == size_;
965 }
966 
isEmpty()967 bool TFileTransportBuffer::isEmpty() {
968   return writePoint_ == 0;
969 }
970 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport)971 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
972                                shared_ptr<TProtocolFactory> protocolFactory,
973                                shared_ptr<TFileReaderTransport> inputTransport)
974   : processor_(processor),
975     inputProtocolFactory_(protocolFactory),
976     outputProtocolFactory_(protocolFactory),
977     inputTransport_(inputTransport) {
978 
979   // default the output transport to a null transport (common case)
980   outputTransport_ = std::make_shared<TNullTransport>();
981 }
982 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> inputProtocolFactory,shared_ptr<TProtocolFactory> outputProtocolFactory,shared_ptr<TFileReaderTransport> inputTransport)983 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
984                                shared_ptr<TProtocolFactory> inputProtocolFactory,
985                                shared_ptr<TProtocolFactory> outputProtocolFactory,
986                                shared_ptr<TFileReaderTransport> inputTransport)
987   : processor_(processor),
988     inputProtocolFactory_(inputProtocolFactory),
989     outputProtocolFactory_(outputProtocolFactory),
990     inputTransport_(inputTransport) {
991 
992   // default the output transport to a null transport (common case)
993   outputTransport_ = std::make_shared<TNullTransport>();
994 }
995 
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport,shared_ptr<TTransport> outputTransport)996 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
997                                shared_ptr<TProtocolFactory> protocolFactory,
998                                shared_ptr<TFileReaderTransport> inputTransport,
999                                shared_ptr<TTransport> outputTransport)
1000   : processor_(processor),
1001     inputProtocolFactory_(protocolFactory),
1002     outputProtocolFactory_(protocolFactory),
1003     inputTransport_(inputTransport),
1004     outputTransport_(outputTransport) {
1005 }
1006 
process(uint32_t numEvents,bool tail)1007 void TFileProcessor::process(uint32_t numEvents, bool tail) {
1008   shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1009   shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1010 
1011   // set the read timeout to 0 if tailing is required
1012   int32_t oldReadTimeout = inputTransport_->getReadTimeout();
1013   if (tail) {
1014     // save old read timeout so it can be restored
1015     inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
1016   }
1017 
1018   uint32_t numProcessed = 0;
1019   while (1) {
1020     // bad form to use exceptions for flow control but there is really
1021     // no other way around it
1022     try {
1023       processor_->process(inputProtocol, outputProtocol, nullptr);
1024       numProcessed++;
1025       if ((numEvents > 0) && (numProcessed == numEvents)) {
1026         return;
1027       }
1028     } catch (TEOFException&) {
1029       if (!tail) {
1030         break;
1031       }
1032     } catch (TException& te) {
1033       cerr << te.what() << endl;
1034       break;
1035     }
1036   }
1037 
1038   // restore old read timeout
1039   if (tail) {
1040     inputTransport_->setReadTimeout(oldReadTimeout);
1041   }
1042 }
1043 
processChunk()1044 void TFileProcessor::processChunk() {
1045   shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1046   shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1047 
1048   uint32_t curChunk = inputTransport_->getCurChunk();
1049 
1050   while (1) {
1051     // bad form to use exceptions for flow control but there is really
1052     // no other way around it
1053     try {
1054       processor_->process(inputProtocol, outputProtocol, nullptr);
1055       if (curChunk != inputTransport_->getCurChunk()) {
1056         break;
1057       }
1058     } catch (TEOFException&) {
1059       break;
1060     } catch (TException& te) {
1061       cerr << te.what() << endl;
1062       break;
1063     }
1064   }
1065 }
1066 }
1067 }
1068 } // apache::thrift::transport
1069