1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <thrift/transport/TFileTransport.h>
23 #include <thrift/transport/TTransportUtils.h>
24 #include <thrift/transport/PlatformSocket.h>
25 #include <thrift/concurrency/FunctionRunner.h>
26
27 #include <boost/version.hpp>
28
29 #ifdef HAVE_SYS_TIME_H
30 #include <sys/time.h>
31 #else
32 #include <time.h>
33 #endif
34 #include <fcntl.h>
35 #ifdef HAVE_UNISTD_H
36 #include <unistd.h>
37 #endif
38 #ifdef HAVE_STRINGS_H
39 #include <strings.h>
40 #endif
41 #include <cstdlib>
42 #include <cstring>
43 #include <iostream>
44 #include <limits>
45 #include <memory>
46 #ifdef HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49
50 #ifdef _WIN32
51 #include <io.h>
52 #endif
53
54 namespace apache {
55 namespace thrift {
56 namespace transport {
57
58 using std::shared_ptr;
59 using std::cerr;
60 using std::cout;
61 using std::endl;
62 using std::string;
63 using namespace apache::thrift::protocol;
64 using namespace apache::thrift::concurrency;
65
TFileTransport(string path,bool readOnly,std::shared_ptr<TConfiguration> config)66 TFileTransport::TFileTransport(string path, bool readOnly, std::shared_ptr<TConfiguration> config)
67 : TTransport(config),
68 readState_(),
69 readBuff_(nullptr),
70 currentEvent_(nullptr),
71 readBuffSize_(DEFAULT_READ_BUFF_SIZE),
72 readTimeout_(NO_TAIL_READ_TIMEOUT),
73 chunkSize_(DEFAULT_CHUNK_SIZE),
74 eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE),
75 flushMaxUs_(DEFAULT_FLUSH_MAX_US),
76 flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES),
77 maxEventSize_(DEFAULT_MAX_EVENT_SIZE),
78 maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS),
79 eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US),
80 corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US),
81 writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US),
82 dequeueBuffer_(nullptr),
83 enqueueBuffer_(nullptr),
84 notFull_(&mutex_),
85 notEmpty_(&mutex_),
86 closing_(false),
87 flushed_(&mutex_),
88 forceFlush_(false),
89 filename_(path),
90 fd_(0),
91 bufferAndThreadInitialized_(false),
92 offset_(0),
93 lastBadChunk_(0),
94 numCorruptedEventsInChunk_(0),
95 readOnly_(readOnly) {
96 threadFactory_.setDetached(false);
97 openLogFile();
98 }
99
resetOutputFile(int fd,string filename,off_t offset)100 void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) {
101 filename_ = filename;
102 offset_ = offset;
103
104 // check if current file is still open
105 if (fd_ > 0) {
106 // flush any events in the queue
107 flush();
108 GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
109 if (-1 == ::THRIFT_CLOSE(fd_)) {
110 int errno_copy = THRIFT_ERRNO;
111 GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
112 throw TTransportException(TTransportException::UNKNOWN,
113 "TFileTransport: error in file close",
114 errno_copy);
115 } else {
116 // successfully closed fd
117 fd_ = 0;
118 }
119 }
120
121 if (fd) {
122 fd_ = fd;
123 } else {
124 // open file if the input fd is 0
125 openLogFile();
126 }
127 }
128
~TFileTransport()129 TFileTransport::~TFileTransport() {
130 // flush the buffer if a writer thread is active
131 if (writerThread_.get()) {
132 // set state to closing
133 closing_ = true;
134
135 // wake up the writer thread
136 // Since closing_ is true, it will attempt to flush all data, then exit.
137 notEmpty_.notify();
138
139 writerThread_->join();
140 writerThread_.reset();
141 }
142
143 if (dequeueBuffer_) {
144 delete dequeueBuffer_;
145 dequeueBuffer_ = nullptr;
146 }
147
148 if (enqueueBuffer_) {
149 delete enqueueBuffer_;
150 enqueueBuffer_ = nullptr;
151 }
152
153 if (readBuff_) {
154 delete[] readBuff_;
155 readBuff_ = nullptr;
156 }
157
158 if (currentEvent_) {
159 delete currentEvent_;
160 currentEvent_ = nullptr;
161 }
162
163 // close logfile
164 if (fd_ > 0) {
165 if (-1 == ::THRIFT_CLOSE(fd_)) {
166 GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
167 } else {
168 // successfully closed fd
169 fd_ = 0;
170 }
171 }
172 }
173
initBufferAndWriteThread()174 bool TFileTransport::initBufferAndWriteThread() {
175 if (bufferAndThreadInitialized_) {
176 T_ERROR("%s", "Trying to double-init TFileTransport");
177 return false;
178 }
179
180 if (!writerThread_.get()) {
181 writerThread_ = threadFactory_.newThread(
182 apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this));
183 writerThread_->start();
184 }
185
186 dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
187 enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
188 bufferAndThreadInitialized_ = true;
189
190 return true;
191 }
192
write(const uint8_t * buf,uint32_t len)193 void TFileTransport::write(const uint8_t* buf, uint32_t len) {
194 if (readOnly_) {
195 throw TTransportException("TFileTransport: attempting to write to file opened readonly");
196 }
197
198 enqueueEvent(buf, len);
199 }
200
201 template <class _T>
202 struct uniqueDeleter
203 {
operator ()apache::thrift::transport::uniqueDeleter204 void operator()(_T *ptr) const { delete ptr; }
205 };
206
enqueueEvent(const uint8_t * buf,uint32_t eventLen)207 void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
208 // can't enqueue more events if file is going to close
209 if (closing_) {
210 return;
211 }
212
213 // make sure that event size is valid
214 if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
215 T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
216 return;
217 }
218
219 if (eventLen == 0) {
220 T_ERROR("%s", "cannot enqueue an empty event");
221 return;
222 }
223
224 std::unique_ptr<eventInfo, uniqueDeleter<eventInfo> > toEnqueue(new eventInfo());
225 toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
226
227 // first 4 bytes is the event length
228 memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
229 // actual event contents
230 memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
231 toEnqueue->eventSize_ = eventLen + 4;
232
233 // lock mutex
234 Guard g(mutex_);
235
236 // make sure that enqueue buffer is initialized and writer thread is running
237 if (!bufferAndThreadInitialized_) {
238 if (!initBufferAndWriteThread()) {
239 return;
240 }
241 }
242
243 // Can't enqueue while buffer is full
244 while (enqueueBuffer_->isFull()) {
245 notFull_.wait();
246 }
247
248 // We shouldn't be trying to enqueue new data while a forced flush is
249 // requested. (Otherwise the writer thread might not ever be able to finish
250 // the flush if more data keeps being enqueued.)
251 assert(!forceFlush_);
252
253 // add to the buffer
254 eventInfo* pEvent = toEnqueue.release();
255 if (!enqueueBuffer_->addEvent(pEvent)) {
256 delete pEvent;
257 return;
258 }
259
260 // signal anybody who's waiting for the buffer to be non-empty
261 notEmpty_.notify();
262
263 // this really should be a loop where it makes sure it got flushed
264 // because condition variables can get triggered by the os for no reason
265 // it is probably a non-factor for the time being
266 }
267
swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> * deadline)268 bool TFileTransport::swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline) {
269 bool swap;
270 Guard g(mutex_);
271
272 if (!enqueueBuffer_->isEmpty()) {
273 swap = true;
274 } else if (closing_) {
275 // even though there is no data to write,
276 // return immediately if the transport is closing
277 swap = false;
278 } else {
279 if (deadline != nullptr) {
280 // if we were handed a deadline time struct, do a timed wait
281 notEmpty_.waitForTime(*deadline);
282 } else {
283 // just wait until the buffer gets an item
284 notEmpty_.wait();
285 }
286
287 // could be empty if we timed out
288 swap = enqueueBuffer_->isEmpty();
289 }
290
291 if (swap) {
292 TFileTransportBuffer* temp = enqueueBuffer_;
293 enqueueBuffer_ = dequeueBuffer_;
294 dequeueBuffer_ = temp;
295 }
296
297 if (swap) {
298 notFull_.notify();
299 }
300
301 return swap;
302 }
303
writerThread()304 void TFileTransport::writerThread() {
305 bool hasIOError = false;
306
307 // open file if it is not open
308 if (!fd_) {
309 try {
310 openLogFile();
311 } catch (...) {
312 int errno_copy = THRIFT_ERRNO;
313 GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
314 fd_ = 0;
315 hasIOError = true;
316 }
317 }
318
319 // set the offset to the correct value (EOF)
320 if (!hasIOError) {
321 try {
322 seekToEnd();
323 // throw away any partial events
324 offset_ += readState_.lastDispatchPtr_;
325 if (0 == THRIFT_FTRUNCATE(fd_, offset_)) {
326 readState_.resetAllValues();
327 } else {
328 int errno_copy = THRIFT_ERRNO;
329 GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy);
330 hasIOError = true;
331 }
332 } catch (...) {
333 int errno_copy = THRIFT_ERRNO;
334 GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
335 hasIOError = true;
336 }
337 }
338
339 // Figure out the next time by which a flush must take place
340 auto ts_next_flush = getNextFlushTime();
341 uint32_t unflushed = 0;
342
343 while (1) {
344 // this will only be true when the destructor is being invoked
345 if (closing_) {
346 if (hasIOError) {
347 return;
348 }
349
350 // Try to empty buffers before exit
351 if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
352 ::THRIFT_FSYNC(fd_);
353 if (-1 == ::THRIFT_CLOSE(fd_)) {
354 int errno_copy = THRIFT_ERRNO;
355 GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
356 } else {
357 // fd successfully closed
358 fd_ = 0;
359 }
360 return;
361 }
362 }
363
364 if (swapEventBuffers(&ts_next_flush)) {
365 eventInfo* outEvent;
366 while (nullptr != (outEvent = dequeueBuffer_->getNext())) {
367 // Remove an event from the buffer and write it out to disk. If there is any IO error, for
368 // instance,
369 // the output file is unmounted or deleted, then this event is dropped. However, the writer
370 // thread
371 // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
372 // start writing
373 // from the end.
374
375 while (hasIOError) {
376 T_ERROR(
377 "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors",
378 writerThreadIOErrorSleepTime_);
379 THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_);
380 if (closing_) {
381 return;
382 }
383 if (!fd_) {
384 ::THRIFT_CLOSE(fd_);
385 fd_ = 0;
386 }
387 try {
388 openLogFile();
389 seekToEnd();
390 unflushed = 0;
391 hasIOError = false;
392 T_LOG_OPER(
393 "TFileTransport: log file %s reopened by writer thread during error recovery",
394 filename_.c_str());
395 } catch (...) {
396 T_ERROR("TFileTransport: unable to reopen log file %s during error recovery",
397 filename_.c_str());
398 }
399 }
400
401 // sanity check on event
402 if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
403 T_ERROR("msg size is greater than max event size: %u > %u\n",
404 outEvent->eventSize_,
405 maxEventSize_);
406 continue;
407 }
408
409 // If chunking is required, then make sure that msg does not cross chunk boundary
410 if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
411 // event size must be less than chunk size
412 if (outEvent->eventSize_ > chunkSize_) {
413 T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event",
414 outEvent->eventSize_,
415 chunkSize_);
416 continue;
417 }
418
419 int64_t chunk1 = offset_ / chunkSize_;
420 int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;
421
422 // if adding this event will cross a chunk boundary, pad the chunk with zeros
423 if (chunk1 != chunk2) {
424 // refetch the offset to keep in sync
425 offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
426 auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
427
428 auto* zeros = new uint8_t[padding];
429 memset(zeros, '\0', padding);
430 boost::scoped_array<uint8_t> array(zeros);
431 if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) {
432 int errno_copy = THRIFT_ERRNO;
433 GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ",
434 errno_copy);
435 hasIOError = true;
436 continue;
437 }
438 unflushed += padding;
439 offset_ += padding;
440 }
441 }
442
443 // write the dequeued event to the file
444 if (outEvent->eventSize_ > 0) {
445 if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
446 int errno_copy = THRIFT_ERRNO;
447 GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
448 hasIOError = true;
449 continue;
450 }
451 unflushed += outEvent->eventSize_;
452 offset_ += outEvent->eventSize_;
453 }
454 }
455 dequeueBuffer_->reset();
456 }
457
458 if (hasIOError) {
459 continue;
460 }
461
462 // Local variable to cache the state of forceFlush_.
463 //
464 // We only want to check the value of forceFlush_ once each time around the
465 // loop. If we check it more than once without holding the lock the entire
466 // time, it could have changed state in between. This will result in us
467 // making inconsistent decisions.
468 bool forced_flush = false;
469 {
470 Guard g(mutex_);
471 if (forceFlush_) {
472 if (!enqueueBuffer_->isEmpty()) {
473 // If forceFlush_ is true, we need to flush all available data.
474 // If enqueueBuffer_ is not empty, go back to the start of the loop to
475 // write it out.
476 //
477 // We know the main thread is waiting on forceFlush_ to be cleared,
478 // so no new events will be added to enqueueBuffer_ until we clear
479 // forceFlush_. Therefore the next time around the loop enqueueBuffer_
480 // is guaranteed to be empty. (I.e., we're guaranteed to make progress
481 // and clear forceFlush_ the next time around the loop.)
482 continue;
483 }
484 forced_flush = true;
485 }
486 }
487
488 // determine if we need to perform an fsync
489 bool flush = false;
490 if (forced_flush || unflushed > flushMaxBytes_) {
491 flush = true;
492 } else {
493 if (std::chrono::steady_clock::now() > ts_next_flush) {
494 if (unflushed > 0) {
495 flush = true;
496 } else {
497 // If there is no new data since the last fsync,
498 // don't perform the fsync, but do reset the timer.
499 ts_next_flush = getNextFlushTime();
500 }
501 }
502 }
503
504 if (flush) {
505 // sync (force flush) file to disk
506 THRIFT_FSYNC(fd_);
507 unflushed = 0;
508 ts_next_flush = getNextFlushTime();
509
510 // notify anybody waiting for flush completion
511 if (forced_flush) {
512 Guard g(mutex_);
513 forceFlush_ = false;
514 assert(enqueueBuffer_->isEmpty());
515 assert(dequeueBuffer_->isEmpty());
516 flushed_.notifyAll();
517 }
518 }
519 }
520 }
521
flush()522 void TFileTransport::flush() {
523 resetConsumedMessageSize();
524 // file must be open for writing for any flushing to take place
525 if (!writerThread_.get()) {
526 return;
527 }
528 // wait for flush to take place
529 Guard g(mutex_);
530
531 // Indicate that we are requesting a flush
532 forceFlush_ = true;
533 // Wake up the writer thread so it will perform the flush immediately
534 notEmpty_.notify();
535
536 while (forceFlush_) {
537 flushed_.wait();
538 }
539 }
540
readAll(uint8_t * buf,uint32_t len)541 uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
542 checkReadBytesAvailable(len);
543 uint32_t have = 0;
544 uint32_t get = 0;
545
546 while (have < len) {
547 get = read(buf + have, len - have);
548 if (get <= 0) {
549 throw TEOFException();
550 }
551 have += get;
552 }
553
554 return have;
555 }
556
peek()557 bool TFileTransport::peek() {
558 // check if there is an event ready to be read
559 if (!currentEvent_) {
560 currentEvent_ = readEvent();
561 }
562
563 // did not manage to read an event from the file. This could have happened
564 // if the timeout expired or there was some other error
565 if (!currentEvent_) {
566 return false;
567 }
568
569 // check if there is anything to read
570 return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
571 }
572
read(uint8_t * buf,uint32_t len)573 uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
574 checkReadBytesAvailable(len);
575 // check if there an event is ready to be read
576 if (!currentEvent_) {
577 currentEvent_ = readEvent();
578 }
579
580 // did not manage to read an event from the file. This could have happened
581 // if the timeout expired or there was some other error
582 if (!currentEvent_) {
583 return 0;
584 }
585
586 // read as much of the current event as possible
587 int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
588 if (remaining <= (int32_t)len) {
589 // copy over anything thats remaining
590 if (remaining > 0) {
591 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
592 }
593 delete (currentEvent_);
594 currentEvent_ = nullptr;
595 return remaining;
596 }
597
598 // read as much as possible
599 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
600 currentEvent_->eventBuffPos_ += len;
601 return len;
602 }
603
604 // note caller is responsible for freeing returned events
readEvent()605 eventInfo* TFileTransport::readEvent() {
606 int readTries = 0;
607
608 if (!readBuff_) {
609 readBuff_ = new uint8_t[readBuffSize_];
610 }
611
612 while (1) {
613 // read from the file if read buffer is exhausted
614 if (readState_.bufferPtr_ == readState_.bufferLen_) {
615 // advance the offset pointer
616 offset_ += readState_.bufferLen_;
617 readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
618 // if (readState_.bufferLen_) {
619 // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
620 // }
621 readState_.bufferPtr_ = 0;
622 readState_.lastDispatchPtr_ = 0;
623
624 // read error
625 if (readState_.bufferLen_ == -1) {
626 readState_.resetAllValues();
627 GlobalOutput("TFileTransport: error while reading from file");
628 throw TTransportException("TFileTransport: error while reading from file");
629 } else if (readState_.bufferLen_ == 0) { // EOF
630 // wait indefinitely if there is no timeout
631 if (readTimeout_ == TAIL_READ_TIMEOUT) {
632 THRIFT_SLEEP_USEC(eofSleepTime_);
633 continue;
634 } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
635 // reset state
636 readState_.resetState(0);
637 return nullptr;
638 } else if (readTimeout_ > 0) {
639 // timeout already expired once
640 if (readTries > 0) {
641 readState_.resetState(0);
642 return nullptr;
643 } else {
644 THRIFT_SLEEP_USEC(readTimeout_ * 1000);
645 readTries++;
646 continue;
647 }
648 }
649 }
650 }
651
652 readTries = 0;
653
654 // attempt to read an event from the buffer
655 while (readState_.bufferPtr_ < readState_.bufferLen_) {
656 if (readState_.readingSize_) {
657 if (readState_.eventSizeBuffPos_ == 0) {
658 if ((offset_ + readState_.bufferPtr_) / chunkSize_
659 != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
660 // skip one byte towards chunk boundary
661 // T_DEBUG_L(1, "Skipping a byte");
662 readState_.bufferPtr_++;
663 continue;
664 }
665 }
666
667 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++]
668 = readBuff_[readState_.bufferPtr_++];
669
670 if (readState_.eventSizeBuffPos_ == 4) {
671 if (readState_.getEventSize() == 0) {
672 // 0 length event indicates padding
673 // T_DEBUG_L(1, "Got padding");
674 readState_.resetState(readState_.lastDispatchPtr_);
675 continue;
676 }
677 // got a valid event
678 readState_.readingSize_ = false;
679 if (readState_.event_) {
680 delete (readState_.event_);
681 }
682 readState_.event_ = new eventInfo();
683 readState_.event_->eventSize_ = readState_.getEventSize();
684
685 // check if the event is corrupted and perform recovery if required
686 if (isEventCorrupted()) {
687 performRecovery();
688 // start from the top
689 break;
690 }
691 }
692 } else {
693 if (!readState_.event_->eventBuff_) {
694 readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
695 readState_.event_->eventBuffPos_ = 0;
696 }
697 // take either the entire event or the remaining bytes in the buffer
698 int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
699 readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
700
701 // copy data from read buffer into event buffer
702 memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
703 readBuff_ + readState_.bufferPtr_,
704 reclaimBuffer);
705
706 // increment position ptrs
707 readState_.event_->eventBuffPos_ += reclaimBuffer;
708 readState_.bufferPtr_ += reclaimBuffer;
709
710 // check if the event has been read in full
711 if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
712 // set the completed event to the current event
713 eventInfo* completeEvent = readState_.event_;
714 completeEvent->eventBuffPos_ = 0;
715
716 readState_.event_ = nullptr;
717 readState_.resetState(readState_.bufferPtr_);
718
719 // exit criteria
720 return completeEvent;
721 }
722 }
723 }
724 }
725 }
726
isEventCorrupted()727 bool TFileTransport::isEventCorrupted() {
728 // an error is triggered if:
729 if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
730 // 1. Event size is larger than user-speficied max-event size
731 T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
732 readState_.event_->eventSize_,
733 maxEventSize_);
734 return true;
735 } else if (readState_.event_->eventSize_ > chunkSize_) {
736 // 2. Event size is larger than chunk size
737 T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
738 readState_.event_->eventSize_,
739 chunkSize_);
740 return true;
741 } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_)
742 != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)
743 / chunkSize_)) {
744 // 3. size indicates that event crosses chunk boundary
745 T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu",
746 readState_.event_->eventSize_,
747 static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));
748
749 return true;
750 }
751
752 return false;
753 }
754
performRecovery()755 void TFileTransport::performRecovery() {
756 // perform some kickass recovery
757 uint32_t curChunk = getCurChunk();
758 if (lastBadChunk_ == curChunk) {
759 numCorruptedEventsInChunk_++;
760 } else {
761 lastBadChunk_ = curChunk;
762 numCorruptedEventsInChunk_ = 1;
763 }
764
765 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
766 // maybe there was an error in reading the file from disk
767 // seek to the beginning of chunk and try again
768 seekToChunk(curChunk);
769 } else {
770
771 // just skip ahead to the next chunk if we not already at the last chunk
772 if (curChunk != (getNumChunks() - 1)) {
773 seekToChunk(curChunk + 1);
774 } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
775 // if tailing the file, wait until there is enough data to start
776 // the next chunk
777 while (curChunk == (getNumChunks() - 1)) {
778 THRIFT_SLEEP_USEC(corruptedEventSleepTime_);
779 }
780 seekToChunk(curChunk + 1);
781 } else {
782 // pretty hosed at this stage, rewind the file back to the last successful
783 // point and punt on the error
784 readState_.resetState(readState_.lastDispatchPtr_);
785 currentEvent_ = nullptr;
786 char errorMsg[1024];
787 sprintf(errorMsg,
788 "TFileTransport: log file corrupted at offset: %lu",
789 static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));
790
791 GlobalOutput(errorMsg);
792 throw TTransportException(errorMsg);
793 }
794 }
795 }
796
seekToChunk(int32_t chunk)797 void TFileTransport::seekToChunk(int32_t chunk) {
798 if (fd_ <= 0) {
799 throw TTransportException("File not open");
800 }
801
802 int32_t numChunks = getNumChunks();
803
804 // file is empty, seeking to chunk is pointless
805 if (numChunks == 0) {
806 return;
807 }
808
809 // negative indicates reverse seek (from the end)
810 if (chunk < 0) {
811 chunk += numChunks;
812 }
813
814 // too large a value for reverse seek, just seek to beginning
815 if (chunk < 0) {
816 T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning...");
817 chunk = 0;
818 }
819
820 // cannot seek past EOF
821 bool seekToEnd = false;
822 off_t minEndOffset = 0;
823 if (chunk >= numChunks) {
824 T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead...");
825 seekToEnd = true;
826 chunk = numChunks - 1;
827 // this is the min offset to process events till
828 minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
829 }
830
831 off_t newOffset = off_t(chunk) * chunkSize_;
832 offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
833 readState_.resetAllValues();
834 currentEvent_ = nullptr;
835 if (offset_ == -1) {
836 GlobalOutput("TFileTransport: lseek error in seekToChunk");
837 throw TTransportException("TFileTransport: lseek error in seekToChunk");
838 }
839
840 // seek to EOF if user wanted to go to last chunk
841 if (seekToEnd) {
842 uint32_t oldReadTimeout = getReadTimeout();
843 setReadTimeout(NO_TAIL_READ_TIMEOUT);
844 // keep on reading unti the last event at point of seekChunk call
845 shared_ptr<eventInfo> event;
846 while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
847 event.reset(readEvent());
848 if (event.get() == nullptr) {
849 break;
850 }
851 }
852 setReadTimeout(oldReadTimeout);
853 }
854 }
855
seekToEnd()856 void TFileTransport::seekToEnd() {
857 seekToChunk(getNumChunks());
858 }
859
getNumChunks()860 uint32_t TFileTransport::getNumChunks() {
861 if (fd_ <= 0) {
862 return 0;
863 }
864
865 struct THRIFT_STAT f_info;
866 int rv = ::THRIFT_FSTAT(fd_, &f_info);
867
868 if (rv < 0) {
869 int errno_copy = THRIFT_ERRNO;
870 throw TTransportException(TTransportException::UNKNOWN,
871 "TFileTransport::getNumChunks() (fstat)",
872 errno_copy);
873 }
874
875 if (f_info.st_size > 0) {
876 size_t numChunks = ((f_info.st_size) / chunkSize_) + 1;
877 if (numChunks > (std::numeric_limits<uint32_t>::max)())
878 throw TTransportException("Too many chunks");
879 return static_cast<uint32_t>(numChunks);
880 }
881
882 // empty file has no chunks
883 return 0;
884 }
885
getCurChunk()886 uint32_t TFileTransport::getCurChunk() {
887 return static_cast<uint32_t>(offset_ / chunkSize_);
888 }
889
890 // Utility Functions
openLogFile()891 void TFileTransport::openLogFile() {
892 #ifndef _WIN32
893 mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
894 int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
895 #else
896 int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
897 int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
898 #endif
899 fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
900 offset_ = 0;
901
902 // make sure open call was successful
903 if (fd_ == -1) {
904 int errno_copy = THRIFT_ERRNO;
905 GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
906 throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
907 }
908 }
909
getNextFlushTime()910 std::chrono::time_point<std::chrono::steady_clock> TFileTransport::getNextFlushTime() {
911 return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_);
912 }
913
TFileTransportBuffer(uint32_t size)914 TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
915 : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) {
916 buffer_ = new eventInfo* [size];
917 }
918
~TFileTransportBuffer()919 TFileTransportBuffer::~TFileTransportBuffer() {
920 if (buffer_) {
921 for (uint32_t i = 0; i < writePoint_; i++) {
922 delete buffer_[i];
923 }
924 delete[] buffer_;
925 buffer_ = nullptr;
926 }
927 }
928
addEvent(eventInfo * event)929 bool TFileTransportBuffer::addEvent(eventInfo* event) {
930 if (bufferMode_ == READ) {
931 GlobalOutput("Trying to write to a buffer in read mode");
932 }
933 if (writePoint_ < size_) {
934 buffer_[writePoint_++] = event;
935 return true;
936 } else {
937 // buffer is full
938 return false;
939 }
940 }
941
getNext()942 eventInfo* TFileTransportBuffer::getNext() {
943 if (bufferMode_ == WRITE) {
944 bufferMode_ = READ;
945 }
946 if (readPoint_ < writePoint_) {
947 return buffer_[readPoint_++];
948 } else {
949 // no more entries
950 return nullptr;
951 }
952 }
953
reset()954 void TFileTransportBuffer::reset() {
955 if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
956 T_DEBUG("%s", "Resetting a buffer with unread entries");
957 }
958 // Clean up the old entries
959 for (uint32_t i = 0; i < writePoint_; i++) {
960 delete buffer_[i];
961 }
962 bufferMode_ = WRITE;
963 writePoint_ = 0;
964 readPoint_ = 0;
965 }
966
isFull()967 bool TFileTransportBuffer::isFull() {
968 return writePoint_ == size_;
969 }
970
isEmpty()971 bool TFileTransportBuffer::isEmpty() {
972 return writePoint_ == 0;
973 }
974
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport)975 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
976 shared_ptr<TProtocolFactory> protocolFactory,
977 shared_ptr<TFileReaderTransport> inputTransport)
978 : processor_(processor),
979 inputProtocolFactory_(protocolFactory),
980 outputProtocolFactory_(protocolFactory),
981 inputTransport_(inputTransport) {
982
983 // default the output transport to a null transport (common case)
984 outputTransport_ = std::make_shared<TNullTransport>();
985 }
986
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> inputProtocolFactory,shared_ptr<TProtocolFactory> outputProtocolFactory,shared_ptr<TFileReaderTransport> inputTransport)987 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
988 shared_ptr<TProtocolFactory> inputProtocolFactory,
989 shared_ptr<TProtocolFactory> outputProtocolFactory,
990 shared_ptr<TFileReaderTransport> inputTransport)
991 : processor_(processor),
992 inputProtocolFactory_(inputProtocolFactory),
993 outputProtocolFactory_(outputProtocolFactory),
994 inputTransport_(inputTransport) {
995
996 // default the output transport to a null transport (common case)
997 outputTransport_ = std::make_shared<TNullTransport>();
998 }
999
TFileProcessor(shared_ptr<TProcessor> processor,shared_ptr<TProtocolFactory> protocolFactory,shared_ptr<TFileReaderTransport> inputTransport,shared_ptr<TTransport> outputTransport)1000 TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
1001 shared_ptr<TProtocolFactory> protocolFactory,
1002 shared_ptr<TFileReaderTransport> inputTransport,
1003 shared_ptr<TTransport> outputTransport)
1004 : processor_(processor),
1005 inputProtocolFactory_(protocolFactory),
1006 outputProtocolFactory_(protocolFactory),
1007 inputTransport_(inputTransport),
1008 outputTransport_(outputTransport) {
1009 }
1010
process(uint32_t numEvents,bool tail)1011 void TFileProcessor::process(uint32_t numEvents, bool tail) {
1012 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1013 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1014
1015 // set the read timeout to 0 if tailing is required
1016 int32_t oldReadTimeout = inputTransport_->getReadTimeout();
1017 if (tail) {
1018 // save old read timeout so it can be restored
1019 inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
1020 }
1021
1022 uint32_t numProcessed = 0;
1023 while (1) {
1024 // bad form to use exceptions for flow control but there is really
1025 // no other way around it
1026 try {
1027 processor_->process(inputProtocol, outputProtocol, nullptr);
1028 numProcessed++;
1029 if ((numEvents > 0) && (numProcessed == numEvents)) {
1030 return;
1031 }
1032 } catch (TEOFException&) {
1033 if (!tail) {
1034 break;
1035 }
1036 } catch (TException& te) {
1037 cerr << te.what() << endl;
1038 break;
1039 }
1040 }
1041
1042 // restore old read timeout
1043 if (tail) {
1044 inputTransport_->setReadTimeout(oldReadTimeout);
1045 }
1046 }
1047
processChunk()1048 void TFileProcessor::processChunk() {
1049 shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
1050 shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
1051
1052 uint32_t curChunk = inputTransport_->getCurChunk();
1053
1054 while (1) {
1055 // bad form to use exceptions for flow control but there is really
1056 // no other way around it
1057 try {
1058 processor_->process(inputProtocol, outputProtocol, nullptr);
1059 if (curChunk != inputTransport_->getCurChunk()) {
1060 break;
1061 }
1062 } catch (TEOFException&) {
1063 break;
1064 } catch (TException& te) {
1065 cerr << te.what() << endl;
1066 break;
1067 }
1068 }
1069 }
1070 }
1071 }
1072 } // apache::thrift::transport
1073