1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_TEST_IMPL_STREAM_IPP
11#define BOOST_BEAST_TEST_IMPL_STREAM_IPP
12
13#include <boost/beast/_experimental/test/stream.hpp>
14#include <boost/beast/core/bind_handler.hpp>
15#include <boost/beast/core/buffer_traits.hpp>
16#include <boost/make_shared.hpp>
17#include <stdexcept>
18#include <vector>
19
20namespace boost {
21namespace beast {
22namespace test {
23
24//------------------------------------------------------------------------------
25
26stream::
27service::
28service(net::execution_context& ctx)
29    : beast::detail::service_base<service>(ctx)
30    , sp_(boost::make_shared<service_impl>())
31{
32}
33
34void
35stream::
36service::
37shutdown()
38{
39    std::vector<std::unique_ptr<read_op_base>> v;
40    std::lock_guard<std::mutex> g1(sp_->m_);
41    v.reserve(sp_->v_.size());
42    for(auto p : sp_->v_)
43    {
44        std::lock_guard<std::mutex> g2(p->m);
45        v.emplace_back(std::move(p->op));
46        p->code = status::eof;
47    }
48}
49
50auto
51stream::
52service::
53make_impl(
54    net::io_context& ctx,
55    test::fail_count* fc) ->
56    boost::shared_ptr<state>
57{
58    auto& svc = net::use_service<service>(ctx);
59    auto sp = boost::make_shared<state>(ctx, svc.sp_, fc);
60    std::lock_guard<std::mutex> g(svc.sp_->m_);
61    svc.sp_->v_.push_back(sp.get());
62    return sp;
63}
64
65void
66stream::
67service_impl::
68remove(state& impl)
69{
70    std::lock_guard<std::mutex> g(m_);
71    *std::find(
72        v_.begin(), v_.end(),
73            &impl) = std::move(v_.back());
74    v_.pop_back();
75}
76
77//------------------------------------------------------------------------------
78
79void stream::initiate_read(
80    boost::shared_ptr<state> const& in_,
81    std::unique_ptr<stream::read_op_base>&& op,
82    std::size_t buf_size)
83{
84    std::unique_lock<std::mutex> lock(in_->m);
85
86    ++in_->nread;
87    if(in_->op != nullptr)
88        BOOST_THROW_EXCEPTION(
89            std::logic_error{"in_->op != nullptr"});
90
91    // test failure
92    error_code ec;
93    if(in_->fc && in_->fc->fail(ec))
94    {
95        lock.unlock();
96        (*op)(ec);
97        return;
98    }
99
100    // A request to read 0 bytes from a stream is a no-op.
101    if(buf_size == 0 || buffer_bytes(in_->b.data()) > 0)
102    {
103        lock.unlock();
104        (*op)(ec);
105        return;
106    }
107
108    // deliver error
109    if(in_->code != status::ok)
110    {
111        lock.unlock();
112        (*op)(net::error::eof);
113        return;
114    }
115
116    // complete when bytes available or closed
117    in_->op = std::move(op);
118}
119
120stream::
121state::
122state(
123    net::io_context& ioc_,
124    boost::weak_ptr<service_impl> wp_,
125    fail_count* fc_)
126    : ioc(ioc_)
127    , wp(std::move(wp_))
128    , fc(fc_)
129{
130}
131
132stream::
133state::
134~state()
135{
136    // cancel outstanding read
137    if(op != nullptr)
138        (*op)(net::error::operation_aborted);
139}
140
141void
142stream::
143state::
144remove() noexcept
145{
146    auto sp = wp.lock();
147
148    // If this goes off, it means the lifetime of a test::stream object
149    // extended beyond the lifetime of the associated execution context.
150    BOOST_ASSERT(sp);
151
152    sp->remove(*this);
153}
154
155void
156stream::
157state::
158notify_read()
159{
160    if(op)
161    {
162        auto op_ = std::move(op);
163        op_->operator()(error_code{});
164    }
165    else
166    {
167        cv.notify_all();
168    }
169}
170
171void
172stream::
173state::
174cancel_read()
175{
176    std::unique_ptr<read_op_base> p;
177    {
178        std::lock_guard<std::mutex> lock(m);
179        code = status::eof;
180        p = std::move(op);
181    }
182    if(p != nullptr)
183        (*p)(net::error::operation_aborted);
184}
185
186//------------------------------------------------------------------------------
187
188stream::
189~stream()
190{
191    close();
192    in_->remove();
193}
194
195stream::
196stream(stream&& other)
197{
198    auto in = service::make_impl(
199        other.in_->ioc, other.in_->fc);
200    in_ = std::move(other.in_);
201    out_ = std::move(other.out_);
202    other.in_ = in;
203}
204
205stream&
206stream::
207operator=(stream&& other)
208{
209    close();
210    auto in = service::make_impl(
211        other.in_->ioc, other.in_->fc);
212    in_->remove();
213    in_ = std::move(other.in_);
214    out_ = std::move(other.out_);
215    other.in_ = in;
216    return *this;
217}
218
219//------------------------------------------------------------------------------
220
221stream::
222stream(net::io_context& ioc)
223    : in_(service::make_impl(ioc, nullptr))
224{
225}
226
227stream::
228stream(
229    net::io_context& ioc,
230    fail_count& fc)
231    : in_(service::make_impl(ioc, &fc))
232{
233}
234
235stream::
236stream(
237    net::io_context& ioc,
238    string_view s)
239    : in_(service::make_impl(ioc, nullptr))
240{
241    in_->b.commit(net::buffer_copy(
242        in_->b.prepare(s.size()),
243        net::buffer(s.data(), s.size())));
244}
245
246stream::
247stream(
248    net::io_context& ioc,
249    fail_count& fc,
250    string_view s)
251    : in_(service::make_impl(ioc, &fc))
252{
253    in_->b.commit(net::buffer_copy(
254        in_->b.prepare(s.size()),
255        net::buffer(s.data(), s.size())));
256}
257
258void
259stream::
260connect(stream& remote)
261{
262    BOOST_ASSERT(! out_.lock());
263    BOOST_ASSERT(! remote.out_.lock());
264    std::lock(in_->m, remote.in_->m);
265    std::lock_guard<std::mutex> guard1{in_->m, std::adopt_lock};
266    std::lock_guard<std::mutex> guard2{remote.in_->m, std::adopt_lock};
267    out_ = remote.in_;
268    remote.out_ = in_;
269    in_->code = status::ok;
270    remote.in_->code = status::ok;
271}
272
273string_view
274stream::
275str() const
276{
277    auto const bs = in_->b.data();
278    if(buffer_bytes(bs) == 0)
279        return {};
280    net::const_buffer const b = *net::buffer_sequence_begin(bs);
281    return {static_cast<char const*>(b.data()), b.size()};
282}
283
284void
285stream::
286append(string_view s)
287{
288    std::lock_guard<std::mutex> lock{in_->m};
289    in_->b.commit(net::buffer_copy(
290        in_->b.prepare(s.size()),
291        net::buffer(s.data(), s.size())));
292}
293
294void
295stream::
296clear()
297{
298    std::lock_guard<std::mutex> lock{in_->m};
299    in_->b.consume(in_->b.size());
300}
301
302void
303stream::
304close()
305{
306    in_->cancel_read();
307
308    // disconnect
309    {
310        auto out = out_.lock();
311        out_.reset();
312
313        // notify peer
314        if(out)
315        {
316            std::lock_guard<std::mutex> lock(out->m);
317            if(out->code == status::ok)
318            {
319                out->code = status::eof;
320                out->notify_read();
321            }
322        }
323    }
324}
325
326void
327stream::
328close_remote()
329{
330    std::lock_guard<std::mutex> lock{in_->m};
331    if(in_->code == status::ok)
332    {
333        in_->code = status::eof;
334        in_->notify_read();
335    }
336}
337
338void
339teardown(
340    role_type,
341    stream& s,
342    boost::system::error_code& ec)
343{
344    if( s.in_->fc &&
345        s.in_->fc->fail(ec))
346        return;
347
348    s.close();
349
350    if( s.in_->fc &&
351        s.in_->fc->fail(ec))
352        ec = net::error::eof;
353    else
354        ec = {};
355}
356
357//------------------------------------------------------------------------------
358
359stream
360connect(stream& to)
361{
362#if defined(BOOST_ASIO_NO_TS_EXECUTORS)
363    stream from{net::query(to.get_executor(), net::execution::context)};
364#else // defined(BOOST_ASIO_NO_TS_EXECUTORS)
365    stream from{to.get_executor().context()};
366#endif // defined(BOOST_ASIO_NO_TS_EXECUTORS)
367    from.connect(to);
368    return from;
369}
370
371void
372connect(stream& s1, stream& s2)
373{
374    s1.connect(s2);
375}
376
377} // test
378} // beast
379} // boost
380
381#endif
382