1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 21 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 22 23 #include <thrift/transport/TTransport.h> 24 #include <thrift/Thrift.h> 25 #include <thrift/TProcessor.h> 26 27 #include <string> 28 #include <stdio.h> 29 30 #include <boost/scoped_ptr.hpp> 31 #include <boost/shared_ptr.hpp> 32 33 #include <thrift/concurrency/Mutex.h> 34 #include <thrift/concurrency/Monitor.h> 35 #include <thrift/concurrency/PlatformThreadFactory.h> 36 #include <thrift/concurrency/Thread.h> 37 38 namespace apache 39 { 40 namespace thrift 41 { 42 namespace transport 43 { 44 45 using apache::thrift::TProcessor; 46 using apache::thrift::protocol::TProtocolFactory; 47 using apache::thrift::concurrency::Mutex; 48 using apache::thrift::concurrency::Monitor; 49 50 // Data pertaining to a single event 51 typedef struct eventInfo 52 { 53 uint8_t* eventBuff_; 54 uint32_t eventSize_; 55 uint32_t eventBuffPos_; 56 eventInfoeventInfo57 eventInfo(): eventBuff_(NULL), eventSize_(0), eventBuffPos_(0) {}; ~eventInfoeventInfo58 ~eventInfo() 59 { 60 if (eventBuff_) 61 { 62 delete[] eventBuff_; 63 } 64 } 65 } eventInfo; 66 67 // information about current read state 68 typedef struct readState 69 { 70 eventInfo* event_; 71 72 // keep track of event size 73 uint8_t eventSizeBuff_[4]; 74 uint8_t eventSizeBuffPos_; 75 bool readingSize_; 76 77 // read buffer variables 78 int32_t bufferPtr_; 79 int32_t bufferLen_; 80 81 // last successful dispatch point 82 int32_t lastDispatchPtr_; 83 resetStatereadState84 void resetState(uint32_t lastDispatchPtr) 85 { 86 readingSize_ = true; 87 eventSizeBuffPos_ = 0; 88 lastDispatchPtr_ = lastDispatchPtr; 89 } 90 resetAllValuesreadState91 void resetAllValues() 92 { 93 resetState(0); 94 bufferPtr_ = 0; 95 bufferLen_ = 0; 96 97 if (event_) 98 { 99 delete (event_); 100 } 101 102 event_ = 0; 103 } 104 getEventSizereadState105 inline uint32_t getEventSize() 106 { 107 const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_); 108 return *reinterpret_cast<const uint32_t*>(buffer); 109 } 110 readStatereadState111 readState() 112 { 113 event_ = 0; 114 resetAllValues(); 115 } 116 ~readStatereadState117 ~readState() 118 { 119 if (event_) 120 { 121 delete (event_); 122 } 123 } 124 125 } readState; 126 127 /** 128 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events 129 * to be written to disk. Should be used in the following way: 130 * 1) Buffer created 131 * 2) Buffer written to (addEvent) 132 * 3) Buffer read from (getNext) 133 * 4) Buffer reset (reset) 134 * 5) Go back to 2, or destroy buffer 135 * 136 * The buffer should never be written to after it is read from, unless it is reset first. 137 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport 138 * which uses the buffer in this way. 139 * 140 */ 141 class TFileTransportBuffer 142 { 143 public: 144 TFileTransportBuffer(uint32_t size); 145 ~TFileTransportBuffer(); 146 147 bool addEvent(eventInfo* event); 148 eventInfo* getNext(); 149 void reset(); 150 bool isFull(); 151 bool isEmpty(); 152 153 private: 154 TFileTransportBuffer(); // should not be used 155 156 enum mode 157 { 158 WRITE, 159 READ 160 }; 161 mode bufferMode_; 162 163 uint32_t writePoint_; 164 uint32_t readPoint_; 165 uint32_t size_; 166 eventInfo** buffer_; 167 }; 168 169 /** 170 * Abstract interface for transports used to read files 171 */ 172 class TFileReaderTransport : virtual public TTransport 173 { 174 public: 175 virtual int32_t getReadTimeout() = 0; 176 virtual void setReadTimeout(int32_t readTimeout) = 0; 177 178 virtual uint32_t getNumChunks() = 0; 179 virtual uint32_t getCurChunk() = 0; 180 virtual void seekToChunk(int32_t chunk) = 0; 181 virtual void seekToEnd() = 0; 182 }; 183 184 /** 185 * Abstract interface for transports used to write files 186 */ 187 class TFileWriterTransport : virtual public TTransport 188 { 189 public: 190 virtual uint32_t getChunkSize() = 0; 191 virtual void setChunkSize(uint32_t chunkSize) = 0; 192 }; 193 194 /** 195 * File implementation of a transport. Reads and writes are done to a 196 * file on disk. 197 * 198 */ 199 class TFileTransport : public TFileReaderTransport, 200 public TFileWriterTransport 201 { 202 public: 203 TFileTransport(std::string path, bool readOnly = false); 204 ~TFileTransport(); 205 206 // TODO: what is the correct behaviour for this? 207 // the log file is generally always open isOpen()208 bool isOpen() 209 { 210 return true; 211 } 212 213 void write(const uint8_t* buf, uint32_t len); 214 void flush(); 215 216 uint32_t readAll(uint8_t* buf, uint32_t len); 217 uint32_t read(uint8_t* buf, uint32_t len); 218 bool peek(); 219 220 // log-file specific functions 221 void seekToChunk(int32_t chunk); 222 void seekToEnd(); 223 uint32_t getNumChunks(); 224 uint32_t getCurChunk(); 225 226 // for changing the output file 227 void resetOutputFile(int fd, std::string filename, off_t offset); 228 229 // Setter/Getter functions for user-controllable options setReadBuffSize(uint32_t readBuffSize)230 void setReadBuffSize(uint32_t readBuffSize) 231 { 232 if (readBuffSize) 233 { 234 readBuffSize_ = readBuffSize; 235 } 236 } getReadBuffSize()237 uint32_t getReadBuffSize() 238 { 239 return readBuffSize_; 240 } 241 242 static const int32_t TAIL_READ_TIMEOUT = -1; 243 static const int32_t NO_TAIL_READ_TIMEOUT = 0; setReadTimeout(int32_t readTimeout)244 void setReadTimeout(int32_t readTimeout) 245 { 246 readTimeout_ = readTimeout; 247 } getReadTimeout()248 int32_t getReadTimeout() 249 { 250 return readTimeout_; 251 } 252 setChunkSize(uint32_t chunkSize)253 void setChunkSize(uint32_t chunkSize) 254 { 255 if (chunkSize) 256 { 257 chunkSize_ = chunkSize; 258 } 259 } getChunkSize()260 uint32_t getChunkSize() 261 { 262 return chunkSize_; 263 } 264 setEventBufferSize(uint32_t bufferSize)265 void setEventBufferSize(uint32_t bufferSize) 266 { 267 if (bufferAndThreadInitialized_) 268 { 269 GlobalOutput("Cannot change the buffer size after writer thread started"); 270 return; 271 } 272 273 eventBufferSize_ = bufferSize; 274 } 275 getEventBufferSize()276 uint32_t getEventBufferSize() 277 { 278 return eventBufferSize_; 279 } 280 setFlushMaxUs(uint32_t flushMaxUs)281 void setFlushMaxUs(uint32_t flushMaxUs) 282 { 283 if (flushMaxUs) 284 { 285 flushMaxUs_ = flushMaxUs; 286 } 287 } getFlushMaxUs()288 uint32_t getFlushMaxUs() 289 { 290 return flushMaxUs_; 291 } 292 setFlushMaxBytes(uint32_t flushMaxBytes)293 void setFlushMaxBytes(uint32_t flushMaxBytes) 294 { 295 if (flushMaxBytes) 296 { 297 flushMaxBytes_ = flushMaxBytes; 298 } 299 } getFlushMaxBytes()300 uint32_t getFlushMaxBytes() 301 { 302 return flushMaxBytes_; 303 } 304 setMaxEventSize(uint32_t maxEventSize)305 void setMaxEventSize(uint32_t maxEventSize) 306 { 307 maxEventSize_ = maxEventSize; 308 } getMaxEventSize()309 uint32_t getMaxEventSize() 310 { 311 return maxEventSize_; 312 } 313 setMaxCorruptedEvents(uint32_t maxCorruptedEvents)314 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) 315 { 316 maxCorruptedEvents_ = maxCorruptedEvents; 317 } getMaxCorruptedEvents()318 uint32_t getMaxCorruptedEvents() 319 { 320 return maxCorruptedEvents_; 321 } 322 setEofSleepTimeUs(uint32_t eofSleepTime)323 void setEofSleepTimeUs(uint32_t eofSleepTime) 324 { 325 if (eofSleepTime) 326 { 327 eofSleepTime_ = eofSleepTime; 328 } 329 } getEofSleepTimeUs()330 uint32_t getEofSleepTimeUs() 331 { 332 return eofSleepTime_; 333 } 334 335 /* 336 * Override TTransport *_virt() functions to invoke our implementations. 337 * We cannot use TVirtualTransport to provide these, since we need to inherit 338 * virtually from TTransport. 339 */ read_virt(uint8_t * buf,uint32_t len)340 virtual uint32_t read_virt(uint8_t* buf, uint32_t len) 341 { 342 return this->read(buf, len); 343 } readAll_virt(uint8_t * buf,uint32_t len)344 virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) 345 { 346 return this->readAll(buf, len); 347 } write_virt(const uint8_t * buf,uint32_t len)348 virtual void write_virt(const uint8_t* buf, uint32_t len) 349 { 350 this->write(buf, len); 351 } 352 353 private: 354 // helper functions for writing to a file 355 void enqueueEvent(const uint8_t* buf, uint32_t eventLen); 356 bool swapEventBuffers(struct timeval* deadline); 357 bool initBufferAndWriteThread(); 358 359 // control for writer thread startWriterThread(void * ptr)360 static void* startWriterThread(void* ptr) 361 { 362 static_cast<TFileTransport*>(ptr)->writerThread(); 363 return NULL; 364 } 365 void writerThread(); 366 367 // helper functions for reading from a file 368 eventInfo* readEvent(); 369 370 // event corruption-related functions 371 bool isEventCorrupted(); 372 void performRecovery(); 373 374 // Utility functions 375 void openLogFile(); 376 void getNextFlushTime(struct timeval* ts_next_flush); 377 378 // Class variables 379 readState readState_; 380 uint8_t* readBuff_; 381 eventInfo* currentEvent_; 382 383 uint32_t readBuffSize_; 384 static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; 385 386 int32_t readTimeout_; 387 static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; 388 389 // size of chunks that file will be split up into 390 uint32_t chunkSize_; 391 static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; 392 393 // size of event buffers 394 uint32_t eventBufferSize_; 395 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000; 396 397 // max number of microseconds that can pass without flushing 398 uint32_t flushMaxUs_; 399 static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000; 400 401 // max number of bytes that can be written without flushing 402 uint32_t flushMaxBytes_; 403 static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; 404 405 // max event size 406 uint32_t maxEventSize_; 407 static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; 408 409 // max number of corrupted events per chunk 410 uint32_t maxCorruptedEvents_; 411 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; 412 413 // sleep duration when EOF is hit 414 uint32_t eofSleepTime_; 415 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; 416 417 // sleep duration when a corrupted event is encountered 418 uint32_t corruptedEventSleepTime_; 419 static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000; 420 421 // sleep duration in seconds when an IO error is encountered in the writer thread 422 uint32_t writerThreadIOErrorSleepTime_; 423 static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; 424 425 // writer thread 426 apache::thrift::concurrency::PlatformThreadFactory threadFactory_; 427 boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_; 428 429 // buffers to hold data before it is flushed. Each element of the buffer stores a msg that 430 // needs to be written to the file. The buffers are swapped by the writer thread. 431 TFileTransportBuffer* dequeueBuffer_; 432 TFileTransportBuffer* enqueueBuffer_; 433 434 // conditions used to block when the buffer is full or empty 435 Monitor notFull_, notEmpty_; 436 volatile bool closing_; 437 438 // To keep track of whether the buffer has been flushed 439 Monitor flushed_; 440 volatile bool forceFlush_; 441 442 // Mutex that is grabbed when enqueueing and swapping the read/write buffers 443 Mutex mutex_; 444 445 // File information 446 std::string filename_; 447 int fd_; 448 449 // Whether the writer thread and buffers have been initialized 450 bool bufferAndThreadInitialized_; 451 452 // Offset within the file 453 off_t offset_; 454 455 // event corruption information 456 uint32_t lastBadChunk_; 457 uint32_t numCorruptedEventsInChunk_; 458 459 bool readOnly_; 460 }; 461 462 // Exception thrown when EOF is hit 463 class TEOFException : public TTransportException 464 { 465 public: TEOFException()466 TEOFException(): 467 TTransportException(TTransportException::END_OF_FILE) {}; 468 }; 469 470 471 // wrapper class to process events from a file containing thrift events 472 class TFileProcessor 473 { 474 public: 475 /** 476 * Constructor that defaults output transport to null transport 477 * 478 * @param processor processes log-file events 479 * @param protocolFactory protocol factory 480 * @param inputTransport file transport 481 */ 482 TFileProcessor(boost::shared_ptr<TProcessor> processor, 483 boost::shared_ptr<TProtocolFactory> protocolFactory, 484 boost::shared_ptr<TFileReaderTransport> inputTransport); 485 486 TFileProcessor(boost::shared_ptr<TProcessor> processor, 487 boost::shared_ptr<TProtocolFactory> inputProtocolFactory, 488 boost::shared_ptr<TProtocolFactory> outputProtocolFactory, 489 boost::shared_ptr<TFileReaderTransport> inputTransport); 490 491 /** 492 * Constructor 493 * 494 * @param processor processes log-file events 495 * @param protocolFactory protocol factory 496 * @param inputTransport input file transport 497 * @param output output transport 498 */ 499 TFileProcessor(boost::shared_ptr<TProcessor> processor, 500 boost::shared_ptr<TProtocolFactory> protocolFactory, 501 boost::shared_ptr<TFileReaderTransport> inputTransport, 502 boost::shared_ptr<TTransport> outputTransport); 503 504 /** 505 * processes events from the file 506 * 507 * @param numEvents number of events to process (0 for unlimited) 508 * @param tail tails the file if true 509 */ 510 void process(uint32_t numEvents, bool tail); 511 512 /** 513 * process events until the end of the chunk 514 * 515 */ 516 void processChunk(); 517 518 private: 519 boost::shared_ptr<TProcessor> processor_; 520 boost::shared_ptr<TProtocolFactory> inputProtocolFactory_; 521 boost::shared_ptr<TProtocolFactory> outputProtocolFactory_; 522 boost::shared_ptr<TFileReaderTransport> inputTransport_; 523 boost::shared_ptr<TTransport> outputTransport_; 524 }; 525 526 527 } 528 } 529 } // apache::thrift::transport 530 531 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 532