1 #include "rpc/detail/server_session.h"
2 
3 #include "rpc/server.h"
4 #include "rpc/this_handler.h"
5 #include "rpc/this_server.h"
6 #include "rpc/this_session.h"
7 #include "rpc/config.h"
8 
9 #include "rpc/detail/log.h"
10 
11 namespace rpc {
12 namespace detail {
13 
14 static constexpr std::size_t default_buffer_size = rpc::constants::DEFAULT_BUFFER_SIZE;
15 
server_session(server * srv,boost::asio::io_service * io,boost::asio::ip::tcp::socket socket,std::shared_ptr<dispatcher> disp,bool suppress_exceptions)16 server_session::server_session(server *srv, boost::asio::io_service *io,
17                                boost::asio::ip::tcp::socket socket,
18                                std::shared_ptr<dispatcher> disp,
19                                bool suppress_exceptions)
20     : async_writer(io, std::move(socket)),
21       parent_(srv),
22       io_(io),
23       read_strand_(*io),
24       disp_(disp),
25       pac_(),
26       suppress_exceptions_(suppress_exceptions) {
27     pac_.reserve_buffer(default_buffer_size); // TODO: make this configurable
28                                               // [sztomi 2016-01-13]
29 }
30 
start()31 void server_session::start() { do_read(); }
32 
close()33 void server_session::close() {
34     LOG_INFO("Closing session.");
35     exit_ = true;
36     write_strand_.post([this]() { socket_.close(); });
37 }
38 
do_read()39 void server_session::do_read() {
40     auto self(shared_from_this());
41     constexpr std::size_t max_read_bytes = default_buffer_size;
42     socket_.async_read_some(
43         boost::asio::buffer(pac_.buffer(), default_buffer_size),
44         // I don't think max_read_bytes needs to be captured explicitly
45         // (since it's constexpr), but MSVC insists.
46         read_strand_.wrap([this, self, max_read_bytes](boost::system::error_code ec,
47                                                        std::size_t length) {
48             if (!ec) {
49                 pac_.buffer_consumed(length);
50                 RPCLIB_MSGPACK::unpacked result;
51                 while (pac_.next(&result) && !exit_) {
52                     auto msg = result.get();
53                     output_buf_.clear();
54 
55                     // any worker thread can take this call
56                     auto z = std::shared_ptr<RPCLIB_MSGPACK::zone>(result.zone().release());
57                     io_->post([
58                         this, msg, z
59                     ]() {
60                         this_handler().clear();
61                         this_session().clear();
62                         this_server().cancel_stop();
63 
64                         auto resp = disp_->dispatch(msg, suppress_exceptions_);
65 
66                         // There are various things that decide what to send
67                         // as a response. They have a precedence.
68 
69                         // First, if the response is disabled, that wins
70                         // So You Get Nothing, You Lose! Good Day Sir!
71                         if (!this_handler().resp_enabled_) {
72                             return;
73                         }
74 
75                         // Second, if there is an error set, we send that
76                         // and only third, if there is a special response, we
77                         // use it
78                         if (!this_handler().error_.get().is_nil()) {
79                             LOG_WARN("There was an error set in the handler");
80                             resp.capture_error(this_handler().error_);
81                         } else if (!this_handler().resp_.get().is_nil()) {
82                             LOG_WARN("There was a special result set in the "
83                                      "handler");
84                             resp.capture_result(this_handler().resp_);
85                         }
86 
87                         if (!resp.is_empty()) {
88 #ifdef _MSC_VER
89                             // doesn't compile otherwise.
90                             write_strand_.post(
91                                 [=]() { write(resp.get_data()); });
92 #else
93                             write_strand_.post(
94                                 [this, resp, z]() { write(resp.get_data()); });
95 #endif
96                         }
97 
98                         if (this_session().exit_) {
99                             LOG_WARN("Session exit requested from a handler.");
100                             // posting through the strand so this comes after
101                             // the previous write
102                             write_strand_.post([this]() { exit_ = true; });
103                         }
104 
105                         if (this_server().stopping_) {
106                             LOG_WARN("Server exit requested from a handler.");
107                             // posting through the strand so this comes after
108                             // the previous write
109                             write_strand_.post(
110                                 [this]() { parent_->close_sessions(); });
111                         }
112                     });
113                 }
114 
115                 if (!exit_) {
116                     // resizing strategy: if the remaining buffer size is
117                     // less than the maximum bytes requested from asio,
118                     // then request max_read_bytes. This prompts the unpacker
119                     // to resize its buffer doubling its size
120                     // (https://github.com/msgpack/msgpack-c/issues/567#issuecomment-280810018)
121                     if (pac_.buffer_capacity() < max_read_bytes) {
122                         LOG_TRACE("Reserving extra buffer: {}", max_read_bytes);
123                         pac_.reserve_buffer(max_read_bytes);
124                     }
125                     do_read();
126                 }
127             }
128         }));
129     if (exit_) {
130         socket_.close();
131     }
132 }
133 
134 } /* detail */
135 } /* rpc */
136