/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #ifdef HAVE_SYS_TIME_H #include #else #include #endif #include #ifdef HAVE_UNISTD_H #include #endif #ifdef HAVE_STRINGS_H #include #endif #include #include #include #include #include #ifdef HAVE_SYS_STAT_H #include #endif #ifdef _WIN32 #include #endif namespace apache { namespace thrift { namespace transport { using std::shared_ptr; using std::cerr; using std::cout; using std::endl; using std::string; using namespace apache::thrift::protocol; using namespace apache::thrift::concurrency; TFileTransport::TFileTransport(string path, bool readOnly) : readState_(), readBuff_(nullptr), currentEvent_(nullptr), readBuffSize_(DEFAULT_READ_BUFF_SIZE), readTimeout_(NO_TAIL_READ_TIMEOUT), chunkSize_(DEFAULT_CHUNK_SIZE), eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE), flushMaxUs_(DEFAULT_FLUSH_MAX_US), flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES), maxEventSize_(DEFAULT_MAX_EVENT_SIZE), maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS), eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US), corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US), writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US), dequeueBuffer_(nullptr), enqueueBuffer_(nullptr), notFull_(&mutex_), notEmpty_(&mutex_), closing_(false), flushed_(&mutex_), forceFlush_(false), filename_(path), fd_(0), bufferAndThreadInitialized_(false), offset_(0), lastBadChunk_(0), numCorruptedEventsInChunk_(0), readOnly_(readOnly) { threadFactory_.setDetached(false); openLogFile(); } void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { filename_ = filename; offset_ = offset; // check if current file is still open if (fd_ > 0) { // flush any events in the queue flush(); GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); if (-1 == ::THRIFT_CLOSE(fd_)) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy); } else { // successfully closed fd fd_ = 0; } } if (fd) { fd_ = fd; } else { // open file if the input fd is 0 openLogFile(); } } TFileTransport::~TFileTransport() { // flush the buffer if a writer thread is active if (writerThread_.get()) { // set state to closing closing_ = true; // wake up the writer thread // Since closing_ is true, it will attempt to flush all data, then exit. notEmpty_.notify(); writerThread_->join(); writerThread_.reset(); } if (dequeueBuffer_) { delete dequeueBuffer_; dequeueBuffer_ = nullptr; } if (enqueueBuffer_) { delete enqueueBuffer_; enqueueBuffer_ = nullptr; } if (readBuff_) { delete[] readBuff_; readBuff_ = nullptr; } if (currentEvent_) { delete currentEvent_; currentEvent_ = nullptr; } // close logfile if (fd_ > 0) { if (-1 == ::THRIFT_CLOSE(fd_)) { GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO); } else { // successfully closed fd fd_ = 0; } } } bool TFileTransport::initBufferAndWriteThread() { if (bufferAndThreadInitialized_) { T_ERROR("%s", "Trying to double-init TFileTransport"); return false; } if (!writerThread_.get()) { writerThread_ = threadFactory_.newThread( apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this)); writerThread_->start(); } dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); bufferAndThreadInitialized_ = true; return true; } void TFileTransport::write(const uint8_t* buf, uint32_t len) { if (readOnly_) { throw TTransportException("TFileTransport: attempting to write to file opened readonly"); } enqueueEvent(buf, len); } template struct uniqueDeleter { void operator()(_T *ptr) const { delete ptr; } }; void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { // can't enqueue more events if file is going to close if (closing_) { return; } // make sure that event size is valid if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) { T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_); return; } if (eventLen == 0) { T_ERROR("%s", "cannot enqueue an empty event"); return; } std::unique_ptr > toEnqueue(new eventInfo()); toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4]; // first 4 bytes is the event length memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); // actual event contents memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); toEnqueue->eventSize_ = eventLen + 4; // lock mutex Guard g(mutex_); // make sure that enqueue buffer is initialized and writer thread is running if (!bufferAndThreadInitialized_) { if (!initBufferAndWriteThread()) { return; } } // Can't enqueue while buffer is full while (enqueueBuffer_->isFull()) { notFull_.wait(); } // We shouldn't be trying to enqueue new data while a forced flush is // requested. (Otherwise the writer thread might not ever be able to finish // the flush if more data keeps being enqueued.) assert(!forceFlush_); // add to the buffer eventInfo* pEvent = toEnqueue.release(); if (!enqueueBuffer_->addEvent(pEvent)) { delete pEvent; return; } // signal anybody who's waiting for the buffer to be non-empty notEmpty_.notify(); // this really should be a loop where it makes sure it got flushed // because condition variables can get triggered by the os for no reason // it is probably a non-factor for the time being } bool TFileTransport::swapEventBuffers(const std::chrono::time_point *deadline) { bool swap; Guard g(mutex_); if (!enqueueBuffer_->isEmpty()) { swap = true; } else if (closing_) { // even though there is no data to write, // return immediately if the transport is closing swap = false; } else { if (deadline != nullptr) { // if we were handed a deadline time struct, do a timed wait notEmpty_.waitForTime(*deadline); } else { // just wait until the buffer gets an item notEmpty_.wait(); } // could be empty if we timed out swap = enqueueBuffer_->isEmpty(); } if (swap) { TFileTransportBuffer* temp = enqueueBuffer_; enqueueBuffer_ = dequeueBuffer_; dequeueBuffer_ = temp; } if (swap) { notFull_.notify(); } return swap; } void TFileTransport::writerThread() { bool hasIOError = false; // open file if it is not open if (!fd_) { try { openLogFile(); } catch (...) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy); fd_ = 0; hasIOError = true; } } // set the offset to the correct value (EOF) if (!hasIOError) { try { seekToEnd(); // throw away any partial events offset_ += readState_.lastDispatchPtr_; if (0 == THRIFT_FTRUNCATE(fd_, offset_)) { readState_.resetAllValues(); } else { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: writerThread() truncate ", errno_copy); hasIOError = true; } } catch (...) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy); hasIOError = true; } } // Figure out the next time by which a flush must take place auto ts_next_flush = getNextFlushTime(); uint32_t unflushed = 0; while (1) { // this will only be true when the destructor is being invoked if (closing_) { if (hasIOError) { return; } // Try to empty buffers before exit if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { ::THRIFT_FSYNC(fd_); if (-1 == ::THRIFT_CLOSE(fd_)) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); } else { // fd successfully closed fd_ = 0; } return; } } if (swapEventBuffers(&ts_next_flush)) { eventInfo* outEvent; while (nullptr != (outEvent = dequeueBuffer_->getNext())) { // Remove an event from the buffer and write it out to disk. If there is any IO error, for // instance, // the output file is unmounted or deleted, then this event is dropped. However, the writer // thread // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then // start writing // from the end. while (hasIOError) { T_ERROR( "TFileTransport: writer thread going to sleep for %u microseconds due to IO errors", writerThreadIOErrorSleepTime_); THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_); if (closing_) { return; } if (!fd_) { ::THRIFT_CLOSE(fd_); fd_ = 0; } try { openLogFile(); seekToEnd(); unflushed = 0; hasIOError = false; T_LOG_OPER( "TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str()); } catch (...) { T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str()); } } // sanity check on event if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); continue; } // If chunking is required, then make sure that msg does not cross chunk boundary if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { // event size must be less than chunk size if (outEvent->eventSize_ > chunkSize_) { T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_); continue; } int64_t chunk1 = offset_ / chunkSize_; int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_; // if adding this event will cross a chunk boundary, pad the chunk with zeros if (chunk1 != chunk2) { // refetch the offset to keep in sync offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR); auto padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_); auto* zeros = new uint8_t[padding]; memset(zeros, '\0', padding); boost::scoped_array array(zeros); if (-1 == ::THRIFT_WRITE(fd_, zeros, padding)) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy); hasIOError = true; continue; } unflushed += padding; offset_ += padding; } } // write the dequeued event to the file if (outEvent->eventSize_ > 0) { if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); hasIOError = true; continue; } unflushed += outEvent->eventSize_; offset_ += outEvent->eventSize_; } } dequeueBuffer_->reset(); } if (hasIOError) { continue; } // Local variable to cache the state of forceFlush_. // // We only want to check the value of forceFlush_ once each time around the // loop. If we check it more than once without holding the lock the entire // time, it could have changed state in between. This will result in us // making inconsistent decisions. bool forced_flush = false; { Guard g(mutex_); if (forceFlush_) { if (!enqueueBuffer_->isEmpty()) { // If forceFlush_ is true, we need to flush all available data. // If enqueueBuffer_ is not empty, go back to the start of the loop to // write it out. // // We know the main thread is waiting on forceFlush_ to be cleared, // so no new events will be added to enqueueBuffer_ until we clear // forceFlush_. Therefore the next time around the loop enqueueBuffer_ // is guaranteed to be empty. (I.e., we're guaranteed to make progress // and clear forceFlush_ the next time around the loop.) continue; } forced_flush = true; } } // determine if we need to perform an fsync bool flush = false; if (forced_flush || unflushed > flushMaxBytes_) { flush = true; } else { if (std::chrono::steady_clock::now() > ts_next_flush) { if (unflushed > 0) { flush = true; } else { // If there is no new data since the last fsync, // don't perform the fsync, but do reset the timer. ts_next_flush = getNextFlushTime(); } } } if (flush) { // sync (force flush) file to disk THRIFT_FSYNC(fd_); unflushed = 0; ts_next_flush = getNextFlushTime(); // notify anybody waiting for flush completion if (forced_flush) { Guard g(mutex_); forceFlush_ = false; assert(enqueueBuffer_->isEmpty()); assert(dequeueBuffer_->isEmpty()); flushed_.notifyAll(); } } } } void TFileTransport::flush() { // file must be open for writing for any flushing to take place if (!writerThread_.get()) { return; } // wait for flush to take place Guard g(mutex_); // Indicate that we are requesting a flush forceFlush_ = true; // Wake up the writer thread so it will perform the flush immediately notEmpty_.notify(); while (forceFlush_) { flushed_.wait(); } } uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { uint32_t have = 0; uint32_t get = 0; while (have < len) { get = read(buf + have, len - have); if (get <= 0) { throw TEOFException(); } have += get; } return have; } bool TFileTransport::peek() { // check if there is an event ready to be read if (!currentEvent_) { currentEvent_ = readEvent(); } // did not manage to read an event from the file. This could have happened // if the timeout expired or there was some other error if (!currentEvent_) { return false; } // check if there is anything to read return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0; } uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { // check if there an event is ready to be read if (!currentEvent_) { currentEvent_ = readEvent(); } // did not manage to read an event from the file. This could have happened // if the timeout expired or there was some other error if (!currentEvent_) { return 0; } // read as much of the current event as possible int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; if (remaining <= (int32_t)len) { // copy over anything thats remaining if (remaining > 0) { memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining); } delete (currentEvent_); currentEvent_ = nullptr; return remaining; } // read as much as possible memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); currentEvent_->eventBuffPos_ += len; return len; } // note caller is responsible for freeing returned events eventInfo* TFileTransport::readEvent() { int readTries = 0; if (!readBuff_) { readBuff_ = new uint8_t[readBuffSize_]; } while (1) { // read from the file if read buffer is exhausted if (readState_.bufferPtr_ == readState_.bufferLen_) { // advance the offset pointer offset_ += readState_.bufferLen_; readState_.bufferLen_ = static_cast(::THRIFT_READ(fd_, readBuff_, readBuffSize_)); // if (readState_.bufferLen_) { // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); // } readState_.bufferPtr_ = 0; readState_.lastDispatchPtr_ = 0; // read error if (readState_.bufferLen_ == -1) { readState_.resetAllValues(); GlobalOutput("TFileTransport: error while reading from file"); throw TTransportException("TFileTransport: error while reading from file"); } else if (readState_.bufferLen_ == 0) { // EOF // wait indefinitely if there is no timeout if (readTimeout_ == TAIL_READ_TIMEOUT) { THRIFT_SLEEP_USEC(eofSleepTime_); continue; } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { // reset state readState_.resetState(0); return nullptr; } else if (readTimeout_ > 0) { // timeout already expired once if (readTries > 0) { readState_.resetState(0); return nullptr; } else { THRIFT_SLEEP_USEC(readTimeout_ * 1000); readTries++; continue; } } } } readTries = 0; // attempt to read an event from the buffer while (readState_.bufferPtr_ < readState_.bufferLen_) { if (readState_.readingSize_) { if (readState_.eventSizeBuffPos_ == 0) { if ((offset_ + readState_.bufferPtr_) / chunkSize_ != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) { // skip one byte towards chunk boundary // T_DEBUG_L(1, "Skipping a byte"); readState_.bufferPtr_++; continue; } } readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = readBuff_[readState_.bufferPtr_++]; if (readState_.eventSizeBuffPos_ == 4) { if (readState_.getEventSize() == 0) { // 0 length event indicates padding // T_DEBUG_L(1, "Got padding"); readState_.resetState(readState_.lastDispatchPtr_); continue; } // got a valid event readState_.readingSize_ = false; if (readState_.event_) { delete (readState_.event_); } readState_.event_ = new eventInfo(); readState_.event_->eventSize_ = readState_.getEventSize(); // check if the event is corrupted and perform recovery if required if (isEventCorrupted()) { performRecovery(); // start from the top break; } } } else { if (!readState_.event_->eventBuff_) { readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; readState_.event_->eventBuffPos_ = 0; } // take either the entire event or the remaining bytes in the buffer int reclaimBuffer = (std::min)((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); // copy data from read buffer into event buffer memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, readBuff_ + readState_.bufferPtr_, reclaimBuffer); // increment position ptrs readState_.event_->eventBuffPos_ += reclaimBuffer; readState_.bufferPtr_ += reclaimBuffer; // check if the event has been read in full if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { // set the completed event to the current event eventInfo* completeEvent = readState_.event_; completeEvent->eventBuffPos_ = 0; readState_.event_ = nullptr; readState_.resetState(readState_.bufferPtr_); // exit criteria return completeEvent; } } } } } bool TFileTransport::isEventCorrupted() { // an error is triggered if: if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { // 1. Event size is larger than user-speficied max-event size T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", readState_.event_->eventSize_, maxEventSize_); return true; } else if (readState_.event_->eventSize_ > chunkSize_) { // 2. Event size is larger than chunk size T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", readState_.event_->eventSize_, chunkSize_); return true; } else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_) != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1) / chunkSize_)) { // 3. size indicates that event crosses chunk boundary T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu", readState_.event_->eventSize_, static_cast(offset_ + readState_.bufferPtr_ + 4)); return true; } return false; } void TFileTransport::performRecovery() { // perform some kickass recovery uint32_t curChunk = getCurChunk(); if (lastBadChunk_ == curChunk) { numCorruptedEventsInChunk_++; } else { lastBadChunk_ = curChunk; numCorruptedEventsInChunk_ = 1; } if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { // maybe there was an error in reading the file from disk // seek to the beginning of chunk and try again seekToChunk(curChunk); } else { // just skip ahead to the next chunk if we not already at the last chunk if (curChunk != (getNumChunks() - 1)) { seekToChunk(curChunk + 1); } else if (readTimeout_ == TAIL_READ_TIMEOUT) { // if tailing the file, wait until there is enough data to start // the next chunk while (curChunk == (getNumChunks() - 1)) { THRIFT_SLEEP_USEC(corruptedEventSleepTime_); } seekToChunk(curChunk + 1); } else { // pretty hosed at this stage, rewind the file back to the last successful // point and punt on the error readState_.resetState(readState_.lastDispatchPtr_); currentEvent_ = nullptr; char errorMsg[1024]; sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", static_cast(offset_ + readState_.lastDispatchPtr_)); GlobalOutput(errorMsg); throw TTransportException(errorMsg); } } } void TFileTransport::seekToChunk(int32_t chunk) { if (fd_ <= 0) { throw TTransportException("File not open"); } int32_t numChunks = getNumChunks(); // file is empty, seeking to chunk is pointless if (numChunks == 0) { return; } // negative indicates reverse seek (from the end) if (chunk < 0) { chunk += numChunks; } // too large a value for reverse seek, just seek to beginning if (chunk < 0) { T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning..."); chunk = 0; } // cannot seek past EOF bool seekToEnd = false; off_t minEndOffset = 0; if (chunk >= numChunks) { T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead..."); seekToEnd = true; chunk = numChunks - 1; // this is the min offset to process events till minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END); } off_t newOffset = off_t(chunk) * chunkSize_; offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET); readState_.resetAllValues(); currentEvent_ = nullptr; if (offset_ == -1) { GlobalOutput("TFileTransport: lseek error in seekToChunk"); throw TTransportException("TFileTransport: lseek error in seekToChunk"); } // seek to EOF if user wanted to go to last chunk if (seekToEnd) { uint32_t oldReadTimeout = getReadTimeout(); setReadTimeout(NO_TAIL_READ_TIMEOUT); // keep on reading unti the last event at point of seekChunk call shared_ptr event; while ((offset_ + readState_.bufferPtr_) < minEndOffset) { event.reset(readEvent()); if (event.get() == nullptr) { break; } } setReadTimeout(oldReadTimeout); } } void TFileTransport::seekToEnd() { seekToChunk(getNumChunks()); } uint32_t TFileTransport::getNumChunks() { if (fd_ <= 0) { return 0; } struct THRIFT_STAT f_info; int rv = ::THRIFT_FSTAT(fd_, &f_info); if (rv < 0) { int errno_copy = THRIFT_ERRNO; throw TTransportException(TTransportException::UNKNOWN, "TFileTransport::getNumChunks() (fstat)", errno_copy); } if (f_info.st_size > 0) { size_t numChunks = ((f_info.st_size) / chunkSize_) + 1; if (numChunks > (std::numeric_limits::max)()) throw TTransportException("Too many chunks"); return static_cast(numChunks); } // empty file has no chunks return 0; } uint32_t TFileTransport::getCurChunk() { return static_cast(offset_ / chunkSize_); } // Utility Functions void TFileTransport::openLogFile() { #ifndef _WIN32 mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; #else int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE; int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND; #endif fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode); offset_ = 0; // make sure open call was successful if (fd_ == -1) { int errno_copy = THRIFT_ERRNO; GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); } } std::chrono::time_point TFileTransport::getNextFlushTime() { return std::chrono::steady_clock::now() + std::chrono::microseconds(flushMaxUs_); } TFileTransportBuffer::TFileTransportBuffer(uint32_t size) : bufferMode_(WRITE), writePoint_(0), readPoint_(0), size_(size) { buffer_ = new eventInfo* [size]; } TFileTransportBuffer::~TFileTransportBuffer() { if (buffer_) { for (uint32_t i = 0; i < writePoint_; i++) { delete buffer_[i]; } delete[] buffer_; buffer_ = nullptr; } } bool TFileTransportBuffer::addEvent(eventInfo* event) { if (bufferMode_ == READ) { GlobalOutput("Trying to write to a buffer in read mode"); } if (writePoint_ < size_) { buffer_[writePoint_++] = event; return true; } else { // buffer is full return false; } } eventInfo* TFileTransportBuffer::getNext() { if (bufferMode_ == WRITE) { bufferMode_ = READ; } if (readPoint_ < writePoint_) { return buffer_[readPoint_++]; } else { // no more entries return nullptr; } } void TFileTransportBuffer::reset() { if (bufferMode_ == WRITE || writePoint_ > readPoint_) { T_DEBUG("%s", "Resetting a buffer with unread entries"); } // Clean up the old entries for (uint32_t i = 0; i < writePoint_; i++) { delete buffer_[i]; } bufferMode_ = WRITE; writePoint_ = 0; readPoint_ = 0; } bool TFileTransportBuffer::isFull() { return writePoint_ == size_; } bool TFileTransportBuffer::isEmpty() { return writePoint_ == 0; } TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, shared_ptr inputTransport) : processor_(processor), inputProtocolFactory_(protocolFactory), outputProtocolFactory_(protocolFactory), inputTransport_(inputTransport) { // default the output transport to a null transport (common case) outputTransport_ = std::make_shared(); } TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr inputProtocolFactory, shared_ptr outputProtocolFactory, shared_ptr inputTransport) : processor_(processor), inputProtocolFactory_(inputProtocolFactory), outputProtocolFactory_(outputProtocolFactory), inputTransport_(inputTransport) { // default the output transport to a null transport (common case) outputTransport_ = std::make_shared(); } TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, shared_ptr inputTransport, shared_ptr outputTransport) : processor_(processor), inputProtocolFactory_(protocolFactory), outputProtocolFactory_(protocolFactory), inputTransport_(inputTransport), outputTransport_(outputTransport) { } void TFileProcessor::process(uint32_t numEvents, bool tail) { shared_ptr inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); shared_ptr outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); // set the read timeout to 0 if tailing is required int32_t oldReadTimeout = inputTransport_->getReadTimeout(); if (tail) { // save old read timeout so it can be restored inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); } uint32_t numProcessed = 0; while (1) { // bad form to use exceptions for flow control but there is really // no other way around it try { processor_->process(inputProtocol, outputProtocol, nullptr); numProcessed++; if ((numEvents > 0) && (numProcessed == numEvents)) { return; } } catch (TEOFException&) { if (!tail) { break; } } catch (TException& te) { cerr << te.what() << endl; break; } } // restore old read timeout if (tail) { inputTransport_->setReadTimeout(oldReadTimeout); } } void TFileProcessor::processChunk() { shared_ptr inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); shared_ptr outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); uint32_t curChunk = inputTransport_->getCurChunk(); while (1) { // bad form to use exceptions for flow control but there is really // no other way around it try { processor_->process(inputProtocol, outputProtocol, nullptr); if (curChunk != inputTransport_->getCurChunk()) { break; } } catch (TEOFException&) { break; } catch (TException& te) { cerr << te.what() << endl; break; } } } } } } // apache::thrift::transport