1 /* 2 3 Copyright (c) 2007-2018, Arvid Norberg, Steven Siloti 4 All rights reserved. 5 6 Redistribution and use in source and binary forms, with or without 7 modification, are permitted provided that the following conditions 8 are met: 9 10 * Redistributions of source code must retain the above copyright 11 notice, this list of conditions and the following disclaimer. 12 * Redistributions in binary form must reproduce the above copyright 13 notice, this list of conditions and the following disclaimer in 14 the documentation and/or other materials provided with the distribution. 15 * Neither the name of the author nor the names of its 16 contributors may be used to endorse or promote products derived 17 from this software without specific prior written permission. 18 19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 POSSIBILITY OF SUCH DAMAGE. 30 31 */ 32 33 #ifndef TORRENT_DISK_IO_THREAD 34 #define TORRENT_DISK_IO_THREAD 35 36 #include "libtorrent/config.hpp" 37 #include "libtorrent/fwd.hpp" 38 #include "libtorrent/debug.hpp" 39 #include "libtorrent/storage.hpp" 40 #include "libtorrent/io_service.hpp" 41 #include "libtorrent/disk_io_thread_pool.hpp" 42 #include "libtorrent/disk_io_job.hpp" 43 #include "libtorrent/disk_job_pool.hpp" 44 #include "libtorrent/block_cache.hpp" 45 #include "libtorrent/file_pool.hpp" 46 #include "libtorrent/disk_interface.hpp" 47 #include "libtorrent/performance_counters.hpp" 48 #include "libtorrent/aux_/session_settings.hpp" 49 50 #include <mutex> 51 #include <condition_variable> 52 #include <atomic> 53 #include <memory> 54 #include <vector> 55 56 namespace libtorrent { 57 58 struct counters; 59 class alert_manager; 60 61 namespace aux { struct block_cache_reference; } 62 63 struct cached_piece_info 64 { 65 storage_interface* storage; 66 67 // holds one entry for each block in this piece. ``true`` represents 68 // the data for that block being in the disk cache and ``false`` means it's not. 69 std::vector<bool> blocks; 70 71 // the time when a block was last written to this piece. The older 72 // a piece is, the more likely it is to be flushed to disk. 73 time_point last_use; 74 75 // The index of the next block that needs to be hashed. 76 // Blocks are hashed as they are downloaded in order to not 77 // have to re-read them from disk once the piece is complete, to 78 // compare its hash against the hashes in the .torrent file. 79 int next_to_hash; 80 81 // the piece index for this cache entry. 82 piece_index_t piece; 83 84 enum kind_t { read_cache = 0, write_cache = 1, volatile_read_cache = 2 }; 85 86 // specifies if this piece is part of the read cache or the write cache. 87 kind_t kind; 88 89 bool need_readback; 90 }; 91 92 using jobqueue_t = tailqueue<disk_io_job>; 93 94 // this struct holds a number of statistics counters 95 // relevant for the disk io thread and disk cache. 96 struct TORRENT_EXPORT cache_status 97 { 98 // initializes all counters to 0 cache_statuslibtorrent::cache_status99 cache_status() 100 : pieces() 101 #if TORRENT_ABI_VERSION == 1 102 , blocks_written(0) 103 , writes(0) 104 , blocks_read(0) 105 , blocks_read_hit(0) 106 , reads(0) 107 , queued_bytes(0) 108 , cache_size(0) 109 , write_cache_size(0) 110 , read_cache_size(0) 111 , pinned_blocks(0) 112 , total_used_buffers(0) 113 , average_read_time(0) 114 , average_write_time(0) 115 , average_hash_time(0) 116 , average_job_time(0) 117 , cumulative_job_time(0) 118 , cumulative_read_time(0) 119 , cumulative_write_time(0) 120 , cumulative_hash_time(0) 121 , total_read_back(0) 122 , read_queue_size(0) 123 , blocked_jobs(0) 124 , queued_jobs(0) 125 , peak_queued(0) 126 , pending_jobs(0) 127 , num_jobs(0) 128 , num_read_jobs(0) 129 , num_write_jobs(0) 130 , arc_mru_size(0) 131 , arc_mru_ghost_size(0) 132 , arc_mfu_size(0) 133 , arc_mfu_ghost_size(0) 134 , arc_write_size(0) 135 , arc_volatile_size(0) 136 , num_writing_threads(0) 137 #endif 138 { 139 #if TORRENT_ABI_VERSION == 1 140 std::memset(num_fence_jobs, 0, sizeof(num_fence_jobs)); 141 #endif 142 } 143 144 std::vector<cached_piece_info> pieces; 145 146 #if TORRENT_ABI_VERSION == 1 147 // the total number of 16 KiB blocks written to disk 148 // since this session was started. 149 int blocks_written; 150 151 // the total number of write operations performed since this 152 // session was started. 153 // 154 // The ratio (``blocks_written`` - ``writes``) / ``blocks_written`` represents 155 // the number of saved write operations per total write operations. i.e. a kind 156 // of cache hit ratio for the write cahe. 157 int writes; 158 159 // the number of blocks that were requested from the 160 // bittorrent engine (from peers), that were served from disk or cache. 161 int blocks_read; 162 163 // the number of blocks that was just copied from the read cache 164 // 165 // The ratio ``blocks_read_hit`` / ``blocks_read`` is the cache hit ratio 166 // for the read cache. 167 int blocks_read_hit; 168 169 // the number of read operations used 170 int reads; 171 172 // the number of bytes queued for writing, including bytes 173 // submitted to the OS for writing, but not yet complete 174 mutable std::int64_t queued_bytes; 175 176 // the number of 16 KiB blocks currently in the disk cache (both read and write). 177 // This includes both read and write cache. 178 int cache_size; 179 180 // the number of blocks in the cache used for write cache 181 int write_cache_size; 182 183 // the number of 16KiB blocks in the read cache. 184 int read_cache_size; 185 186 // the number of blocks with a refcount > 0, i.e. 187 // they may not be evicted 188 int pinned_blocks; 189 190 // the total number of buffers currently in use. 191 // This includes the read/write disk cache as well as send and receive buffers 192 // used in peer connections. 193 // deprecated, use session_stats_metrics "disk.disk_blocks_in_use" 194 mutable int total_used_buffers; 195 196 // the number of microseconds an average disk I/O job 197 // has to wait in the job queue before it get processed. 198 199 // the time read jobs takes on average to complete 200 // (not including the time in the queue), in microseconds. This only measures 201 // read cache misses. 202 int average_read_time; 203 204 // the time write jobs takes to complete, on average, 205 // in microseconds. This does not include the time the job sits in the disk job 206 // queue or in the write cache, only blocks that are flushed to disk. 207 int average_write_time; 208 209 // the time hash jobs takes to complete on average, in 210 // microseconds. Hash jobs include running SHA-1 on the data (which for the most 211 // part is done incrementally) and sometimes reading back parts of the piece. It 212 // also includes checking files without valid resume data. 213 int average_hash_time; 214 int average_job_time; 215 216 // the number of milliseconds spent in all disk jobs, and specific ones 217 // since the start of the session. Times are specified in milliseconds 218 int cumulative_job_time; 219 int cumulative_read_time; 220 int cumulative_write_time; 221 int cumulative_hash_time; 222 223 // the number of blocks that had to be read back from disk because 224 // they were flushed before the SHA-1 hash got to hash them. If this 225 // is large, a larger cache could significantly improve performance 226 int total_read_back; 227 228 // number of read jobs in the disk job queue 229 int read_queue_size; 230 231 // number of jobs blocked because of a fence 232 int blocked_jobs; 233 234 // number of jobs waiting to be issued (m_to_issue) 235 // average over 30 seconds 236 // deprecated, use session_stats_metrics "disk.queued_disk_jobs" 237 int queued_jobs; 238 239 // largest ever seen number of queued jobs 240 int peak_queued; 241 242 // number of jobs waiting to complete (m_pending) 243 // average over 30 seconds 244 int pending_jobs; 245 246 // total number of disk job objects allocated right now 247 int num_jobs; 248 249 // total number of disk read job objects allocated right now 250 int num_read_jobs; 251 252 // total number of disk write job objects allocated right now 253 int num_write_jobs; 254 255 // ARC cache stats. All of these counters are in number of pieces 256 // not blocks. A piece does not necessarily correspond to a certain 257 // number of blocks. The pieces in the ghost list never have any 258 // blocks in them 259 int arc_mru_size; 260 int arc_mru_ghost_size; 261 int arc_mfu_size; 262 int arc_mfu_ghost_size; 263 int arc_write_size; 264 int arc_volatile_size; 265 266 // the number of threads currently writing to disk 267 int num_writing_threads; 268 269 // counts only fence jobs that are currently blocking jobs 270 // not fences that are themself blocked 271 int num_fence_jobs[static_cast<int>(job_action_t::num_job_ids)]; 272 #endif 273 }; 274 275 // this is a singleton consisting of the thread and a queue 276 // of disk io jobs 277 struct TORRENT_EXTRA_EXPORT disk_io_thread final 278 : disk_job_pool 279 , disk_interface 280 , buffer_allocator_interface 281 { 282 disk_io_thread(io_service& ios, aux::session_settings const&, counters&); 283 #if TORRENT_USE_ASSERTS 284 ~disk_io_thread(); 285 #endif 286 287 enum 288 { 289 // every 4:th thread is a hash thread 290 hasher_thread_divisor = 4 291 }; 292 293 void settings_updated(); 294 295 void abort(bool wait); 296 297 storage_holder new_torrent(storage_constructor_type sc 298 , storage_params p, std::shared_ptr<void> const&) override; 299 void remove_torrent(storage_index_t) override; 300 301 void async_read(storage_index_t storage, peer_request const& r 302 , std::function<void(disk_buffer_holder block 303 , disk_job_flags_t flags, storage_error const& se)> handler, disk_job_flags_t flags = {}) override; 304 bool async_write(storage_index_t storage, peer_request const& r 305 , char const* buf, std::shared_ptr<disk_observer> o 306 , std::function<void(storage_error const&)> handler 307 , disk_job_flags_t flags = {}) override; 308 void async_hash(storage_index_t storage, piece_index_t piece, disk_job_flags_t flags 309 , std::function<void(piece_index_t, sha1_hash const&, storage_error const&)> handler) override; 310 void async_move_storage(storage_index_t storage, std::string p, move_flags_t flags 311 , std::function<void(status_t, std::string const&, storage_error const&)> handler) override; 312 void async_release_files(storage_index_t storage 313 , std::function<void()> handler = std::function<void()>()) override; 314 void async_delete_files(storage_index_t storage, remove_flags_t options 315 , std::function<void(storage_error const&)> handler) override; 316 void async_check_files(storage_index_t storage 317 , add_torrent_params const* resume_data 318 , aux::vector<std::string, file_index_t>& links 319 , std::function<void(status_t, storage_error const&)> handler) override; 320 void async_rename_file(storage_index_t storage, file_index_t index, std::string name 321 , std::function<void(std::string const&, file_index_t, storage_error const&)> handler) override; 322 void async_stop_torrent(storage_index_t storage 323 , std::function<void()> handler) override; 324 void async_flush_piece(storage_index_t storage, piece_index_t piece 325 , std::function<void()> handler = std::function<void()>()) override; 326 void async_set_file_priority(storage_index_t storage 327 , aux::vector<download_priority_t, file_index_t> prio 328 , std::function<void(storage_error const&, aux::vector<download_priority_t, file_index_t>)> handler) override; 329 330 void async_clear_piece(storage_index_t storage, piece_index_t index 331 , std::function<void(piece_index_t)> handler) override; 332 // this is not asynchronous and requires that the piece does not 333 // have any pending buffers. It's meant to be used for pieces that 334 // were just read and hashed and failed the hash check. 335 // there should be no read-operations left, and all buffers should 336 // be discardable 337 void clear_piece(storage_index_t storage, piece_index_t index) override; 338 339 // implements buffer_allocator_interface 340 void reclaim_blocks(span<aux::block_cache_reference> ref) override; free_disk_bufferlibtorrent::disk_io_thread341 void free_disk_buffer(char* buf) override { m_disk_cache.free_buffer(buf); } 342 void trigger_cache_trim(); 343 void update_stats_counters(counters& c) const override; 344 void get_cache_info(cache_status* ret, storage_index_t storage 345 , bool no_pieces, bool session) const override; 346 storage_interface* get_torrent(storage_index_t) override; 347 348 std::vector<open_file_state> get_status(storage_index_t) const override; 349 350 // this submits all queued up jobs to the thread 351 void submit_jobs() override; 352 cachelibtorrent::disk_io_thread353 block_cache* cache() { return &m_disk_cache; } 354 355 #if TORRENT_USE_ASSERTS is_disk_bufferlibtorrent::disk_io_thread356 bool is_disk_buffer(char* buffer) const override 357 { return m_disk_cache.is_disk_buffer(buffer); } 358 #endif 359 360 int prep_read_job_impl(disk_io_job* j, bool check_fence = true); 361 362 void maybe_issue_queued_read_jobs(cached_piece_entry* pe, 363 jobqueue_t& completed_jobs); 364 status_t do_read(disk_io_job* j, jobqueue_t& completed_jobs); 365 status_t do_uncached_read(disk_io_job* j); 366 367 status_t do_write(disk_io_job* j, jobqueue_t& completed_jobs); 368 status_t do_uncached_write(disk_io_job* j); 369 370 status_t do_hash(disk_io_job* j, jobqueue_t& completed_jobs); 371 status_t do_uncached_hash(disk_io_job* j); 372 373 status_t do_move_storage(disk_io_job* j, jobqueue_t& completed_jobs); 374 status_t do_release_files(disk_io_job* j, jobqueue_t& completed_jobs); 375 status_t do_delete_files(disk_io_job* j, jobqueue_t& completed_jobs); 376 status_t do_check_fastresume(disk_io_job* j, jobqueue_t& completed_jobs); 377 status_t do_rename_file(disk_io_job* j, jobqueue_t& completed_jobs); 378 status_t do_stop_torrent(disk_io_job* j, jobqueue_t& completed_jobs); 379 status_t do_flush_piece(disk_io_job* j, jobqueue_t& completed_jobs); 380 status_t do_flush_hashed(disk_io_job* j, jobqueue_t& completed_jobs); 381 status_t do_flush_storage(disk_io_job* j, jobqueue_t& completed_jobs); 382 status_t do_trim_cache(disk_io_job* j, jobqueue_t& completed_jobs); 383 status_t do_file_priority(disk_io_job* j, jobqueue_t& completed_jobs); 384 status_t do_clear_piece(disk_io_job* j, jobqueue_t& completed_jobs); 385 386 void call_job_handlers(); 387 388 private: 389 390 struct job_queue : pool_thread_interface 391 { job_queuelibtorrent::disk_io_thread::job_queue392 explicit job_queue(disk_io_thread& owner) : m_owner(owner) {} 393 notify_alllibtorrent::disk_io_thread::job_queue394 void notify_all() override 395 { 396 m_job_cond.notify_all(); 397 } 398 thread_funlibtorrent::disk_io_thread::job_queue399 void thread_fun(disk_io_thread_pool& pool, io_service::work work) override 400 { 401 ADD_OUTSTANDING_ASYNC("disk_io_thread::work"); 402 m_owner.thread_fun(*this, pool); 403 404 // w's dtor releases the io_service to allow the run() call to return 405 // we do this once we stop posting new callbacks to it. 406 // after the dtor has been called, the disk_io_thread object may be destructed 407 TORRENT_UNUSED(work); 408 COMPLETE_ASYNC("disk_io_thread::work"); 409 } 410 411 disk_io_thread& m_owner; 412 413 // used to wake up the disk IO thread when there are new 414 // jobs on the job queue (m_queued_jobs) 415 std::condition_variable m_job_cond; 416 417 // jobs queued for servicing 418 jobqueue_t m_queued_jobs; 419 }; 420 421 void thread_fun(job_queue& queue, disk_io_thread_pool& pool); 422 423 // returns true if the thread should exit 424 static bool wait_for_job(job_queue& jobq, disk_io_thread_pool& threads 425 , std::unique_lock<std::mutex>& l); 426 427 void add_completed_jobs(jobqueue_t& jobs); 428 void add_completed_jobs_impl(jobqueue_t& jobs 429 , jobqueue_t& completed_jobs); 430 431 void fail_jobs(storage_error const& e, jobqueue_t& jobs_); 432 void fail_jobs_impl(storage_error const& e, jobqueue_t& src, jobqueue_t& dst); 433 434 void check_cache_level(std::unique_lock<std::mutex>& l, jobqueue_t& completed_jobs); 435 436 void perform_job(disk_io_job* j, jobqueue_t& completed_jobs); 437 438 // this queues up another job to be submitted 439 void add_job(disk_io_job* j, bool user_add = true); 440 void add_fence_job(disk_io_job* j, bool user_add = true); 441 442 // assumes l is locked (cache std::mutex). 443 // writes out the blocks [start, end) (releases the lock 444 // during the file operation) 445 int flush_range(cached_piece_entry* p, int start, int end 446 , jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 447 448 // low level flush operations, used by flush_range 449 int build_iovec(cached_piece_entry* pe, int start, int end 450 , span<iovec_t> iov, span<int> flushing, int block_base_index = 0); 451 void flush_iovec(cached_piece_entry* pe, span<iovec_t const> iov, span<int const> flushing 452 , int num_blocks, storage_error& error); 453 // returns true if the piece entry was freed 454 bool iovec_flushed(cached_piece_entry* pe 455 , int* flushing, int num_blocks, int block_offset 456 , storage_error const& error 457 , jobqueue_t& completed_jobs); 458 459 // assumes l is locked (the cache std::mutex). 460 // assumes pe->hash to be set. 461 // If there are new blocks in piece 'pe' that have not been 462 // hashed by the partial_hash object attached to this piece, 463 // the piece will 464 void kick_hasher(cached_piece_entry* pe, std::unique_lock<std::mutex>& l); 465 466 // flags to pass in to flush_cache() 467 enum flush_flags_t : std::uint32_t 468 { 469 // only flush read cache (this is cheap) 470 flush_read_cache = 1, 471 // flush read cache, and write cache 472 flush_write_cache = 2, 473 // flush read cache, delete write cache without flushing to disk 474 flush_delete_cache = 4, 475 // expect all pieces for the storage to have been 476 // cleared when flush_cache() returns. This is only 477 // used for asserts and only applies for fence jobs 478 flush_expect_clear = 8 479 }; 480 void flush_cache(storage_interface* storage, std::uint32_t flags, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 481 void flush_expired_write_blocks(jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 482 void flush_piece(cached_piece_entry* pe, std::uint32_t flags, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 483 484 int try_flush_hashed(cached_piece_entry* p, int cont_blocks, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 485 486 void try_flush_write_blocks(int num, jobqueue_t& completed_jobs, std::unique_lock<std::mutex>& l); 487 488 void maybe_flush_write_blocks(); 489 void execute_job(disk_io_job* j); 490 void immediate_execute(); 491 void abort_jobs(); 492 void abort_hash_jobs(storage_index_t storage); 493 494 // returns the maximum number of threads 495 // the actual number of threads may be less 496 int num_threads() const; 497 job_queue& queue_for_job(disk_io_job* j); 498 disk_io_thread_pool& pool_for_job(disk_io_job* j); 499 500 // set to true once we start shutting down 501 std::atomic<bool> m_abort{false}; 502 503 // this is a counter of how many threads are currently running. 504 // it's used to identify the last thread still running while 505 // shutting down. This last thread is responsible for cleanup 506 // must hold the job mutex to access 507 int m_num_running_threads = 0; 508 509 // std::mutex to protect the m_generic_io_jobs and m_hash_io_jobs lists 510 mutable std::mutex m_job_mutex; 511 512 // most jobs are posted to m_generic_io_jobs 513 // but hash jobs are posted to m_hash_io_jobs if m_hash_threads 514 // has a non-zero maximum thread count 515 job_queue m_generic_io_jobs; 516 disk_io_thread_pool m_generic_threads; 517 job_queue m_hash_io_jobs; 518 disk_io_thread_pool m_hash_threads; 519 520 aux::session_settings const& m_settings; 521 522 // the last time we expired write blocks from the cache 523 time_point m_last_cache_expiry = min_time(); 524 525 // we call close_oldest_file on the file_pool regularly. This is the next 526 // time we should call it 527 time_point m_next_close_oldest_file = min_time(); 528 529 // LRU cache of open files 530 file_pool m_file_pool{40}; 531 532 // disk cache 533 mutable std::mutex m_cache_mutex; 534 block_cache m_disk_cache; 535 enum 536 { 537 cache_check_idle, 538 cache_check_active, 539 cache_check_reinvoke 540 }; 541 int m_cache_check_state = cache_check_idle; 542 543 // total number of blocks in use by both the read 544 // and the write cache. This is not supposed to 545 // exceed m_cache_size 546 547 counters& m_stats_counters; 548 549 // this is the main thread io_service. Callbacks are 550 // posted on this in order to have them execute in 551 // the main thread. 552 io_service& m_ios; 553 554 // jobs that are completed are put on this queue 555 // whenever the queue size grows from 0 to 1 556 // a message is posted to the network thread, which 557 // will then drain the queue and execute the jobs' 558 // handler functions 559 std::mutex m_completed_jobs_mutex; 560 jobqueue_t m_completed_jobs; 561 562 // storages that have had write activity recently and will get ticked 563 // soon, for deferred actions (say, flushing partfile metadata) 564 std::vector<std::pair<time_point, std::weak_ptr<storage_interface>>> m_need_tick; 565 std::mutex m_need_tick_mutex; 566 567 // this is protected by the completed_jobs_mutex. It's true whenever 568 // there's a call_job_handlers message in-flight to the network thread. We 569 // only ever keep one such message in flight at a time, and coalesce 570 // completion callbacks in m_completed jobs 571 bool m_job_completions_in_flight = false; 572 573 aux::vector<std::shared_ptr<storage_interface>, storage_index_t> m_torrents; 574 575 // indices into m_torrents to empty slots 576 std::vector<storage_index_t> m_free_slots; 577 578 std::atomic_flag m_jobs_aborted = ATOMIC_FLAG_INIT; 579 580 #if TORRENT_USE_ASSERTS 581 int m_magic = 0x1337; 582 #endif 583 }; 584 } 585 586 #endif 587