1 /***************************************************************************
2  *  foxxll/io/request_queue_impl_qwqr.cpp
3  *
4  *  Part of FOXXLL. See http://foxxll.org
5  *
6  *  Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *  Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8  *  Copyright (C) 2009 Johannes Singler <singler@ira.uka.de>
9  *  Copyright (C) 2013 Timo Bingmann <tb@panthema.net>
10  *
11  *  Distributed under the Boost Software License, Version 1.0.
12  *  (See accompanying file LICENSE_1_0.txt or copy at
13  *  http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 #include <functional>
18 
19 #include <tlx/logger/core.hpp>
20 
21 #include <foxxll/common/error_handling.hpp>
22 #include <foxxll/io/request_queue_impl_qwqr.hpp>
23 #include <foxxll/io/serving_request.hpp>
24 
25 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
26  #include <windows.h>
27 #endif
28 
29 #ifndef FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
30 #define FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
31 #endif
32 
33 namespace foxxll {
34 
35 struct file_offset_match
36     : public std::binary_function<request_ptr, request_ptr, bool>
37 {
operator ()foxxll::file_offset_match38     bool operator () (
39         const request_ptr& a,
40         const request_ptr& b) const
41     {
42         // matching file and offset are enough to cause problems
43         return (a->offset() == b->offset()) &&
44                (a->get_file() == b->get_file());
45     }
46 };
47 
request_queue_impl_qwqr(int n)48 request_queue_impl_qwqr::request_queue_impl_qwqr(int n)
49     : thread_state_(NOT_RUNNING), sem_(0)
50 {
51     tlx::unused(n);
52     start_thread(worker, static_cast<void*>(this), thread_, thread_state_);
53 }
54 
set_priority_op(const priority_op & op)55 void request_queue_impl_qwqr::set_priority_op(const priority_op& op)
56 {
57     //_priority_op = op;
58     tlx::unused(op);
59 }
60 
add_request(request_ptr & req)61 void request_queue_impl_qwqr::add_request(request_ptr& req)
62 {
63     if (req.empty())
64         FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
65     if (thread_state_() != RUNNING)
66         FOXXLL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
67     if (!dynamic_cast<serving_request*>(req.get()))
68         TLX_LOG1 << "Incompatible request submitted to running queue.";
69 
70     if (req.get()->op() == request::READ)
71     {
72 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
73         {
74             std::unique_lock<std::mutex> lock(write_mutex_);
75             if (std::find_if(
76                     write_queue_.begin(), write_queue_.end(),
77                     bind2nd(file_offset_match(), req)
78                 )
79                 != write_queue_.end())
80             {
81                 TLX_LOG1 << "READ request submitted for a BID with a pending WRITE request";
82             }
83         }
84 #endif
85         std::unique_lock<std::mutex> lock(read_mutex_);
86         read_queue_.push_back(req);
87     }
88     else
89     {
90 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
91         {
92             std::unique_lock<std::mutex> lock(read_mutex_);
93             if (std::find_if(
94                     read_queue_.begin(), read_queue_.end(),
95                     bind2nd(file_offset_match(), req)
96                 )
97                 != read_queue_.end())
98             {
99                 TLX_LOG1 << "WRITE request submitted for a BID with a pending READ request";
100             }
101         }
102 #endif
103         std::unique_lock<std::mutex> lock(write_mutex_);
104         write_queue_.push_back(req);
105     }
106 
107     sem_.signal();
108 }
109 
cancel_request(request_ptr & req)110 bool request_queue_impl_qwqr::cancel_request(request_ptr& req)
111 {
112     if (req.empty())
113         FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
114     if (thread_state_() != RUNNING)
115         FOXXLL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
116     if (!dynamic_cast<serving_request*>(req.get()))
117         TLX_LOG1 << "Incompatible request submitted to running queue.";
118 
119     bool was_still_in_queue = false;
120     if (req.get()->op() == request::READ)
121     {
122         std::unique_lock<std::mutex> lock(read_mutex_);
123         queue_type::iterator pos
124             = std::find(read_queue_.begin(), read_queue_.end(), req);
125         if (pos != read_queue_.end())
126         {
127             read_queue_.erase(pos);
128             was_still_in_queue = true;
129             lock.unlock();
130             sem_.wait();
131         }
132     }
133     else
134     {
135         std::unique_lock<std::mutex> lock(write_mutex_);
136         queue_type::iterator pos
137             = std::find(write_queue_.begin(), write_queue_.end(), req);
138         if (pos != write_queue_.end())
139         {
140             write_queue_.erase(pos);
141             was_still_in_queue = true;
142             lock.unlock();
143             sem_.wait();
144         }
145     }
146 
147     return was_still_in_queue;
148 }
149 
~request_queue_impl_qwqr()150 request_queue_impl_qwqr::~request_queue_impl_qwqr()
151 {
152     stop_thread(thread_, thread_state_, sem_);
153 }
154 
worker(void * arg)155 void* request_queue_impl_qwqr::worker(void* arg)
156 {
157     self* pthis = static_cast<self*>(arg);
158 
159     bool write_phase = true;
160     for ( ; ; )
161     {
162         pthis->sem_.wait();
163 
164         if (write_phase)
165         {
166             std::unique_lock<std::mutex> write_lock(pthis->write_mutex_);
167             if (!pthis->write_queue_.empty())
168             {
169                 request_ptr req = pthis->write_queue_.front();
170                 pthis->write_queue_.pop_front();
171 
172                 write_lock.unlock();
173 
174                 //assert(req->get_reference_count()) > 1);
175                 dynamic_cast<serving_request*>(req.get())->serve();
176             }
177             else
178             {
179                 write_lock.unlock();
180 
181                 pthis->sem_.signal();
182 
183                 if (pthis->priority_op_ == WRITE)
184                     write_phase = false;
185             }
186 
187             if (pthis->priority_op_ == NONE || pthis->priority_op_ == READ)
188                 write_phase = false;
189         }
190         else
191         {
192             std::unique_lock<std::mutex> read_lock(pthis->read_mutex_);
193 
194             if (!pthis->read_queue_.empty())
195             {
196                 request_ptr req = pthis->read_queue_.front();
197                 pthis->read_queue_.pop_front();
198 
199                 read_lock.unlock();
200 
201                 TLX_LOG << "queue: before serve request has "
202                         << req->reference_count() << " references ";
203                 //assert(req->get_reference_count() > 1);
204                 dynamic_cast<serving_request*>(req.get())->serve();
205                 TLX_LOG << "queue: after serve request has "
206                         << req->reference_count() << " references ";
207             }
208             else
209             {
210                 read_lock.unlock();
211 
212                 pthis->sem_.signal();
213 
214                 if (pthis->priority_op_ == READ)
215                     write_phase = true;
216             }
217 
218             if (pthis->priority_op_ == NONE || pthis->priority_op_ == WRITE)
219                 write_phase = true;
220         }
221 
222         // terminate if it has been requested and queues are empty
223         if (pthis->thread_state_() == TERMINATING) {
224             if (pthis->sem_.wait() == 0)
225                 break;
226             else
227                 pthis->sem_.signal();
228         }
229     }
230 
231     pthis->thread_state_.set_to(TERMINATED);
232 
233 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
234     // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
235     // request_queue_impl_worker.cpp. -tb
236     ExitThread(nullptr);
237 #else
238     return nullptr;
239 #endif
240 }
241 
242 } // namespace foxxll
243 
244 /**************************************************************************/
245