1 #include "rpc/detail/server_session.h"
2
3 #include "rpc/server.h"
4 #include "rpc/this_handler.h"
5 #include "rpc/this_server.h"
6 #include "rpc/this_session.h"
7 #include "rpc/config.h"
8
9 #include "rpc/detail/log.h"
10
11 namespace rpc {
12 namespace detail {
13
14 static constexpr std::size_t default_buffer_size = rpc::constants::DEFAULT_BUFFER_SIZE;
15
server_session(server * srv,boost::asio::io_service * io,boost::asio::ip::tcp::socket socket,std::shared_ptr<dispatcher> disp,bool suppress_exceptions)16 server_session::server_session(server *srv, boost::asio::io_service *io,
17 boost::asio::ip::tcp::socket socket,
18 std::shared_ptr<dispatcher> disp,
19 bool suppress_exceptions)
20 : async_writer(io, std::move(socket)),
21 parent_(srv),
22 io_(io),
23 read_strand_(*io),
24 disp_(disp),
25 pac_(),
26 suppress_exceptions_(suppress_exceptions) {
27 pac_.reserve_buffer(default_buffer_size); // TODO: make this configurable
28 // [sztomi 2016-01-13]
29 }
30
start()31 void server_session::start() { do_read(); }
32
close()33 void server_session::close() {
34 LOG_INFO("Closing session.");
35 exit_ = true;
36 write_strand_.post([this]() { socket_.close(); });
37 }
38
do_read()39 void server_session::do_read() {
40 auto self(shared_from_this());
41 constexpr std::size_t max_read_bytes = default_buffer_size;
42 socket_.async_read_some(
43 boost::asio::buffer(pac_.buffer(), default_buffer_size),
44 // I don't think max_read_bytes needs to be captured explicitly
45 // (since it's constexpr), but MSVC insists.
46 read_strand_.wrap([this, self, max_read_bytes](boost::system::error_code ec,
47 std::size_t length) {
48 if (!ec) {
49 pac_.buffer_consumed(length);
50 RPCLIB_MSGPACK::unpacked result;
51 while (pac_.next(&result) && !exit_) {
52 auto msg = result.get();
53 output_buf_.clear();
54
55 // any worker thread can take this call
56 auto z = std::shared_ptr<RPCLIB_MSGPACK::zone>(result.zone().release());
57 io_->post([
58 this, msg, z
59 ]() {
60 this_handler().clear();
61 this_session().clear();
62 this_server().cancel_stop();
63
64 auto resp = disp_->dispatch(msg, suppress_exceptions_);
65
66 // There are various things that decide what to send
67 // as a response. They have a precedence.
68
69 // First, if the response is disabled, that wins
70 // So You Get Nothing, You Lose! Good Day Sir!
71 if (!this_handler().resp_enabled_) {
72 return;
73 }
74
75 // Second, if there is an error set, we send that
76 // and only third, if there is a special response, we
77 // use it
78 if (!this_handler().error_.get().is_nil()) {
79 LOG_WARN("There was an error set in the handler");
80 resp.capture_error(this_handler().error_);
81 } else if (!this_handler().resp_.get().is_nil()) {
82 LOG_WARN("There was a special result set in the "
83 "handler");
84 resp.capture_result(this_handler().resp_);
85 }
86
87 if (!resp.is_empty()) {
88 #ifdef _MSC_VER
89 // doesn't compile otherwise.
90 write_strand_.post(
91 [=]() { write(resp.get_data()); });
92 #else
93 write_strand_.post(
94 [this, resp, z]() { write(resp.get_data()); });
95 #endif
96 }
97
98 if (this_session().exit_) {
99 LOG_WARN("Session exit requested from a handler.");
100 // posting through the strand so this comes after
101 // the previous write
102 write_strand_.post([this]() { exit_ = true; });
103 }
104
105 if (this_server().stopping_) {
106 LOG_WARN("Server exit requested from a handler.");
107 // posting through the strand so this comes after
108 // the previous write
109 write_strand_.post(
110 [this]() { parent_->close_sessions(); });
111 }
112 });
113 }
114
115 if (!exit_) {
116 // resizing strategy: if the remaining buffer size is
117 // less than the maximum bytes requested from asio,
118 // then request max_read_bytes. This prompts the unpacker
119 // to resize its buffer doubling its size
120 // (https://github.com/msgpack/msgpack-c/issues/567#issuecomment-280810018)
121 if (pac_.buffer_capacity() < max_read_bytes) {
122 LOG_TRACE("Reserving extra buffer: {}", max_read_bytes);
123 pac_.reserve_buffer(max_read_bytes);
124 }
125 do_read();
126 }
127 }
128 }));
129 if (exit_) {
130 socket_.close();
131 }
132 }
133
134 } /* detail */
135 } /* rpc */
136