1 /***************************************************************************
2 * foxxll/io/request_queue_impl_qwqr.cpp
3 *
4 * Part of FOXXLL. See http://foxxll.org
5 *
6 * Copyright (C) 2002-2005 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7 * Copyright (C) 2008, 2009 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8 * Copyright (C) 2009 Johannes Singler <singler@ira.uka.de>
9 * Copyright (C) 2013 Timo Bingmann <tb@panthema.net>
10 *
11 * Distributed under the Boost Software License, Version 1.0.
12 * (See accompanying file LICENSE_1_0.txt or copy at
13 * http://www.boost.org/LICENSE_1_0.txt)
14 **************************************************************************/
15
16 #include <algorithm>
17 #include <functional>
18
19 #include <tlx/logger/core.hpp>
20
21 #include <foxxll/common/error_handling.hpp>
22 #include <foxxll/io/request_queue_impl_qwqr.hpp>
23 #include <foxxll/io/serving_request.hpp>
24
25 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
26 #include <windows.h>
27 #endif
28
29 #ifndef FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
30 #define FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
31 #endif
32
33 namespace foxxll {
34
35 struct file_offset_match
36 : public std::binary_function<request_ptr, request_ptr, bool>
37 {
operator ()foxxll::file_offset_match38 bool operator () (
39 const request_ptr& a,
40 const request_ptr& b) const
41 {
42 // matching file and offset are enough to cause problems
43 return (a->offset() == b->offset()) &&
44 (a->get_file() == b->get_file());
45 }
46 };
47
request_queue_impl_qwqr(int n)48 request_queue_impl_qwqr::request_queue_impl_qwqr(int n)
49 : thread_state_(NOT_RUNNING), sem_(0)
50 {
51 tlx::unused(n);
52 start_thread(worker, static_cast<void*>(this), thread_, thread_state_);
53 }
54
set_priority_op(const priority_op & op)55 void request_queue_impl_qwqr::set_priority_op(const priority_op& op)
56 {
57 //_priority_op = op;
58 tlx::unused(op);
59 }
60
add_request(request_ptr & req)61 void request_queue_impl_qwqr::add_request(request_ptr& req)
62 {
63 if (req.empty())
64 FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
65 if (thread_state_() != RUNNING)
66 FOXXLL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
67 if (!dynamic_cast<serving_request*>(req.get()))
68 TLX_LOG1 << "Incompatible request submitted to running queue.";
69
70 if (req.get()->op() == request::READ)
71 {
72 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
73 {
74 std::unique_lock<std::mutex> lock(write_mutex_);
75 if (std::find_if(
76 write_queue_.begin(), write_queue_.end(),
77 bind2nd(file_offset_match(), req)
78 )
79 != write_queue_.end())
80 {
81 TLX_LOG1 << "READ request submitted for a BID with a pending WRITE request";
82 }
83 }
84 #endif
85 std::unique_lock<std::mutex> lock(read_mutex_);
86 read_queue_.push_back(req);
87 }
88 else
89 {
90 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
91 {
92 std::unique_lock<std::mutex> lock(read_mutex_);
93 if (std::find_if(
94 read_queue_.begin(), read_queue_.end(),
95 bind2nd(file_offset_match(), req)
96 )
97 != read_queue_.end())
98 {
99 TLX_LOG1 << "WRITE request submitted for a BID with a pending READ request";
100 }
101 }
102 #endif
103 std::unique_lock<std::mutex> lock(write_mutex_);
104 write_queue_.push_back(req);
105 }
106
107 sem_.signal();
108 }
109
cancel_request(request_ptr & req)110 bool request_queue_impl_qwqr::cancel_request(request_ptr& req)
111 {
112 if (req.empty())
113 FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
114 if (thread_state_() != RUNNING)
115 FOXXLL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
116 if (!dynamic_cast<serving_request*>(req.get()))
117 TLX_LOG1 << "Incompatible request submitted to running queue.";
118
119 bool was_still_in_queue = false;
120 if (req.get()->op() == request::READ)
121 {
122 std::unique_lock<std::mutex> lock(read_mutex_);
123 queue_type::iterator pos
124 = std::find(read_queue_.begin(), read_queue_.end(), req);
125 if (pos != read_queue_.end())
126 {
127 read_queue_.erase(pos);
128 was_still_in_queue = true;
129 lock.unlock();
130 sem_.wait();
131 }
132 }
133 else
134 {
135 std::unique_lock<std::mutex> lock(write_mutex_);
136 queue_type::iterator pos
137 = std::find(write_queue_.begin(), write_queue_.end(), req);
138 if (pos != write_queue_.end())
139 {
140 write_queue_.erase(pos);
141 was_still_in_queue = true;
142 lock.unlock();
143 sem_.wait();
144 }
145 }
146
147 return was_still_in_queue;
148 }
149
~request_queue_impl_qwqr()150 request_queue_impl_qwqr::~request_queue_impl_qwqr()
151 {
152 stop_thread(thread_, thread_state_, sem_);
153 }
154
worker(void * arg)155 void* request_queue_impl_qwqr::worker(void* arg)
156 {
157 self* pthis = static_cast<self*>(arg);
158
159 bool write_phase = true;
160 for ( ; ; )
161 {
162 pthis->sem_.wait();
163
164 if (write_phase)
165 {
166 std::unique_lock<std::mutex> write_lock(pthis->write_mutex_);
167 if (!pthis->write_queue_.empty())
168 {
169 request_ptr req = pthis->write_queue_.front();
170 pthis->write_queue_.pop_front();
171
172 write_lock.unlock();
173
174 //assert(req->get_reference_count()) > 1);
175 dynamic_cast<serving_request*>(req.get())->serve();
176 }
177 else
178 {
179 write_lock.unlock();
180
181 pthis->sem_.signal();
182
183 if (pthis->priority_op_ == WRITE)
184 write_phase = false;
185 }
186
187 if (pthis->priority_op_ == NONE || pthis->priority_op_ == READ)
188 write_phase = false;
189 }
190 else
191 {
192 std::unique_lock<std::mutex> read_lock(pthis->read_mutex_);
193
194 if (!pthis->read_queue_.empty())
195 {
196 request_ptr req = pthis->read_queue_.front();
197 pthis->read_queue_.pop_front();
198
199 read_lock.unlock();
200
201 TLX_LOG << "queue: before serve request has "
202 << req->reference_count() << " references ";
203 //assert(req->get_reference_count() > 1);
204 dynamic_cast<serving_request*>(req.get())->serve();
205 TLX_LOG << "queue: after serve request has "
206 << req->reference_count() << " references ";
207 }
208 else
209 {
210 read_lock.unlock();
211
212 pthis->sem_.signal();
213
214 if (pthis->priority_op_ == READ)
215 write_phase = true;
216 }
217
218 if (pthis->priority_op_ == NONE || pthis->priority_op_ == WRITE)
219 write_phase = true;
220 }
221
222 // terminate if it has been requested and queues are empty
223 if (pthis->thread_state_() == TERMINATING) {
224 if (pthis->sem_.wait() == 0)
225 break;
226 else
227 pthis->sem_.signal();
228 }
229 }
230
231 pthis->thread_state_.set_to(TERMINATED);
232
233 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
234 // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
235 // request_queue_impl_worker.cpp. -tb
236 ExitThread(nullptr);
237 #else
238 return nullptr;
239 #endif
240 }
241
242 } // namespace foxxll
243
244 /**************************************************************************/
245