1// 2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) 3// 4// Distributed under the Boost Software License, Version 1.0. (See accompanying 5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6// 7// Official repository: https://github.com/boostorg/beast 8// 9 10#ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP 11#define BOOST_BEAST_TEST_IMPL_STREAM_IPP 12 13#include <boost/beast/_experimental/test/stream.hpp> 14#include <boost/beast/core/bind_handler.hpp> 15#include <boost/beast/core/buffer_traits.hpp> 16#include <boost/make_shared.hpp> 17#include <stdexcept> 18#include <vector> 19 20namespace boost { 21namespace beast { 22namespace test { 23 24//------------------------------------------------------------------------------ 25 26stream:: 27service:: 28service(net::execution_context& ctx) 29 : beast::detail::service_base<service>(ctx) 30 , sp_(boost::make_shared<service_impl>()) 31{ 32} 33 34void 35stream:: 36service:: 37shutdown() 38{ 39 std::vector<std::unique_ptr<read_op_base>> v; 40 std::lock_guard<std::mutex> g1(sp_->m_); 41 v.reserve(sp_->v_.size()); 42 for(auto p : sp_->v_) 43 { 44 std::lock_guard<std::mutex> g2(p->m); 45 v.emplace_back(std::move(p->op)); 46 p->code = status::eof; 47 } 48} 49 50auto 51stream:: 52service:: 53make_impl( 54 net::io_context& ctx, 55 test::fail_count* fc) -> 56 boost::shared_ptr<state> 57{ 58 auto& svc = net::use_service<service>(ctx); 59 auto sp = boost::make_shared<state>(ctx, svc.sp_, fc); 60 std::lock_guard<std::mutex> g(svc.sp_->m_); 61 svc.sp_->v_.push_back(sp.get()); 62 return sp; 63} 64 65void 66stream:: 67service_impl:: 68remove(state& impl) 69{ 70 std::lock_guard<std::mutex> g(m_); 71 *std::find( 72 v_.begin(), v_.end(), 73 &impl) = std::move(v_.back()); 74 v_.pop_back(); 75} 76 77//------------------------------------------------------------------------------ 78 79void stream::initiate_read( 80 boost::shared_ptr<state> const& in_, 81 std::unique_ptr<stream::read_op_base>&& op, 82 std::size_t buf_size) 83{ 84 std::unique_lock<std::mutex> lock(in_->m); 85 86 ++in_->nread; 87 if(in_->op != nullptr) 88 BOOST_THROW_EXCEPTION( 89 std::logic_error{"in_->op != nullptr"}); 90 91 // test failure 92 error_code ec; 93 if(in_->fc && in_->fc->fail(ec)) 94 { 95 lock.unlock(); 96 (*op)(ec); 97 return; 98 } 99 100 // A request to read 0 bytes from a stream is a no-op. 101 if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0) 102 { 103 lock.unlock(); 104 (*op)(ec); 105 return; 106 } 107 108 // deliver error 109 if(in_->code != status::ok) 110 { 111 lock.unlock(); 112 (*op)(net::error::eof); 113 return; 114 } 115 116 // complete when bytes available or closed 117 in_->op = std::move(op); 118} 119 120stream:: 121state:: 122state( 123 net::io_context& ioc_, 124 boost::weak_ptr<service_impl> wp_, 125 fail_count* fc_) 126 : ioc(ioc_) 127 , wp(std::move(wp_)) 128 , fc(fc_) 129{ 130} 131 132stream:: 133state:: 134~state() 135{ 136 // cancel outstanding read 137 if(op != nullptr) 138 (*op)(net::error::operation_aborted); 139} 140 141void 142stream:: 143state:: 144remove() noexcept 145{ 146 auto sp = wp.lock(); 147 148 // If this goes off, it means the lifetime of a test::stream object 149 // extended beyond the lifetime of the associated execution context. 150 BOOST_ASSERT(sp); 151 152 sp->remove(*this); 153} 154 155void 156stream:: 157state:: 158notify_read() 159{ 160 if(op) 161 { 162 auto op_ = std::move(op); 163 op_->operator()(error_code{}); 164 } 165 else 166 { 167 cv.notify_all(); 168 } 169} 170 171void 172stream:: 173state:: 174cancel_read() 175{ 176 std::unique_ptr<read_op_base> p; 177 { 178 std::lock_guard<std::mutex> lock(m); 179 code = status::eof; 180 p = std::move(op); 181 } 182 if(p != nullptr) 183 (*p)(net::error::operation_aborted); 184} 185 186//------------------------------------------------------------------------------ 187 188stream:: 189~stream() 190{ 191 close(); 192 in_->remove(); 193} 194 195stream:: 196stream(stream&& other) 197{ 198 auto in = service::make_impl( 199 other.in_->ioc, other.in_->fc); 200 in_ = std::move(other.in_); 201 out_ = std::move(other.out_); 202 other.in_ = in; 203} 204 205stream& 206stream:: 207operator=(stream&& other) 208{ 209 close(); 210 auto in = service::make_impl( 211 other.in_->ioc, other.in_->fc); 212 in_->remove(); 213 in_ = std::move(other.in_); 214 out_ = std::move(other.out_); 215 other.in_ = in; 216 return *this; 217} 218 219//------------------------------------------------------------------------------ 220 221stream:: 222stream(net::io_context& ioc) 223 : in_(service::make_impl(ioc, nullptr)) 224{ 225} 226 227stream:: 228stream( 229 net::io_context& ioc, 230 fail_count& fc) 231 : in_(service::make_impl(ioc, &fc)) 232{ 233} 234 235stream:: 236stream( 237 net::io_context& ioc, 238 string_view s) 239 : in_(service::make_impl(ioc, nullptr)) 240{ 241 in_->b.commit(net::buffer_copy( 242 in_->b.prepare(s.size()), 243 net::buffer(s.data(), s.size()))); 244} 245 246stream:: 247stream( 248 net::io_context& ioc, 249 fail_count& fc, 250 string_view s) 251 : in_(service::make_impl(ioc, &fc)) 252{ 253 in_->b.commit(net::buffer_copy( 254 in_->b.prepare(s.size()), 255 net::buffer(s.data(), s.size()))); 256} 257 258void 259stream:: 260connect(stream& remote) 261{ 262 BOOST_ASSERT(! out_.lock()); 263 BOOST_ASSERT(! remote.out_.lock()); 264 std::lock(in_->m, remote.in_->m); 265 std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock}; 266 std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock}; 267 out_ = remote.in_; 268 remote.out_ = in_; 269 in_->code = status::ok; 270 remote.in_->code = status::ok; 271} 272 273string_view 274stream:: 275str() const 276{ 277 auto const bs = in_->b.data(); 278 if(buffer_bytes(bs) == 0) 279 return {}; 280 net::const_buffer const b = *net::buffer_sequence_begin(bs); 281 return {static_cast<char const*>(b.data()), b.size()}; 282} 283 284void 285stream:: 286append(string_view s) 287{ 288 std::lock_guard<std::mutex> lock{in_->m}; 289 in_->b.commit(net::buffer_copy( 290 in_->b.prepare(s.size()), 291 net::buffer(s.data(), s.size()))); 292} 293 294void 295stream:: 296clear() 297{ 298 std::lock_guard<std::mutex> lock{in_->m}; 299 in_->b.consume(in_->b.size()); 300} 301 302void 303stream:: 304close() 305{ 306 in_->cancel_read(); 307 308 // disconnect 309 { 310 auto out = out_.lock(); 311 out_.reset(); 312 313 // notify peer 314 if(out) 315 { 316 std::lock_guard<std::mutex> lock(out->m); 317 if(out->code == status::ok) 318 { 319 out->code = status::eof; 320 out->notify_read(); 321 } 322 } 323 } 324} 325 326void 327stream:: 328close_remote() 329{ 330 std::lock_guard<std::mutex> lock{in_->m}; 331 if(in_->code == status::ok) 332 { 333 in_->code = status::eof; 334 in_->notify_read(); 335 } 336} 337 338void 339teardown( 340 role_type, 341 stream& s, 342 boost::system::error_code& ec) 343{ 344 if( s.in_->fc && 345 s.in_->fc->fail(ec)) 346 return; 347 348 s.close(); 349 350 if( s.in_->fc && 351 s.in_->fc->fail(ec)) 352 ec = net::error::eof; 353 else 354 ec = {}; 355} 356 357//------------------------------------------------------------------------------ 358 359stream 360connect(stream& to) 361{ 362#if defined(BOOST_ASIO_NO_TS_EXECUTORS) 363 stream from{net::query(to.get_executor(), net::execution::context)}; 364#else // defined(BOOST_ASIO_NO_TS_EXECUTORS) 365 stream from{to.get_executor().context()}; 366#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS) 367 from.connect(to); 368 return from; 369} 370 371void 372connect(stream& s1, stream& s2) 373{ 374 s1.connect(s2); 375} 376 377} // test 378} // beast 379} // boost 380 381#endif 382