1 /*
2     Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3 
4     This file is part of libzmq, the ZeroMQ core engine in C++.
5 
6     libzmq is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License (LGPL) as published
8     by the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     As a special exception, the Contributors give you permission to link
12     this library with independent modules to produce an executable,
13     regardless of the license terms of these independent modules, and to
14     copy and distribute the resulting executable under terms of your choice,
15     provided that you also meet, for each linked independent module, the
16     terms and conditions of the license of that module. An independent
17     module is a module which is not derived from or based on this library.
18     If you modify this library, you must extend this exception to your
19     version of the library.
20 
21     libzmq is distributed in the hope that it will be useful, but WITHOUT
22     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23     FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24     License for more details.
25 
26     You should have received a copy of the GNU Lesser General Public License
27     along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 */
29 
30 #ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
31 #define __ZMQ_SESSION_BASE_HPP_INCLUDED__
32 
33 #include <stdarg.h>
34 
35 #include "own.hpp"
36 #include "io_object.hpp"
37 #include "pipe.hpp"
38 #include "socket_base.hpp"
39 #include "i_engine.hpp"
40 #include "msg.hpp"
41 
42 namespace zmq
43 {
44 class io_thread_t;
45 struct i_engine;
46 struct address_t;
47 
48 class session_base_t : public own_t, public io_object_t, public i_pipe_events
49 {
50   public:
51     //  Create a session of the particular type.
52     static session_base_t *create (zmq::io_thread_t *io_thread_,
53                                    bool active_,
54                                    zmq::socket_base_t *socket_,
55                                    const options_t &options_,
56                                    address_t *addr_);
57 
58     //  To be used once only, when creating the session.
59     void attach_pipe (zmq::pipe_t *pipe_);
60 
61     //  Following functions are the interface exposed towards the engine.
62     virtual void reset ();
63     void flush ();
64     void rollback ();
65     void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_);
66     void engine_ready ();
67 
68     //  i_pipe_events interface implementation.
69     void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
70     void write_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
71     void hiccuped (zmq::pipe_t *pipe_) ZMQ_FINAL;
72     void pipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
73 
74     //  Delivers a message. Returns 0 if successful; -1 otherwise.
75     //  The function takes ownership of the message.
76     virtual int push_msg (msg_t *msg_);
77 
78     int zap_connect ();
79     bool zap_enabled () const;
80 
81     //  Fetches a message. Returns 0 if successful; -1 otherwise.
82     //  The caller is responsible for freeing the message when no
83     //  longer used.
84     virtual int pull_msg (msg_t *msg_);
85 
86     //  Receives message from ZAP socket.
87     //  Returns 0 on success; -1 otherwise.
88     //  The caller is responsible for freeing the message.
89     int read_zap_msg (msg_t *msg_);
90 
91     //  Sends message to ZAP socket.
92     //  Returns 0 on success; -1 otherwise.
93     //  The function takes ownership of the message.
94     int write_zap_msg (msg_t *msg_);
95 
96     socket_base_t *get_socket () const;
97     const endpoint_uri_pair_t &get_endpoint () const;
98 
99   protected:
100     session_base_t (zmq::io_thread_t *io_thread_,
101                     bool active_,
102                     zmq::socket_base_t *socket_,
103                     const options_t &options_,
104                     address_t *addr_);
105     ~session_base_t () ZMQ_OVERRIDE;
106 
107   private:
108     void start_connecting (bool wait_);
109 
110     void reconnect ();
111 
112     //  Handlers for incoming commands.
113     void process_plug () ZMQ_FINAL;
114     void process_attach (zmq::i_engine *engine_) ZMQ_FINAL;
115     void process_term (int linger_) ZMQ_FINAL;
116     void process_conn_failed ();
117 
118     //  i_poll_events handlers.
119     void timer_event (int id_) ZMQ_FINAL;
120 
121     //  Remove any half processed messages. Flush unflushed messages.
122     //  Call this function when engine disconnect to get rid of leftovers.
123     void clean_pipes ();
124 
125     //  If true, this session (re)connects to the peer. Otherwise, it's
126     //  a transient session created by the listener.
127     const bool _active;
128 
129     //  Pipe connecting the session to its socket.
130     zmq::pipe_t *_pipe;
131 
132     //  Pipe used to exchange messages with ZAP socket.
133     zmq::pipe_t *_zap_pipe;
134 
135     //  This set is added to with pipes we are disconnecting, but haven't yet completed
136     std::set<pipe_t *> _terminating_pipes;
137 
138     //  This flag is true if the remainder of the message being processed
139     //  is still in the in pipe.
140     bool _incomplete_in;
141 
142     //  True if termination have been suspended to push the pending
143     //  messages to the network.
144     bool _pending;
145 
146     //  The protocol I/O engine connected to the session.
147     zmq::i_engine *_engine;
148 
149     //  The socket the session belongs to.
150     zmq::socket_base_t *_socket;
151 
152     //  I/O thread the session is living in. It will be used to plug in
153     //  the engines into the same thread.
154     zmq::io_thread_t *_io_thread;
155 
156     //  ID of the linger timer
157     enum
158     {
159         linger_timer_id = 0x20
160     };
161 
162     //  True is linger timer is running.
163     bool _has_linger_timer;
164 
165     //  Protocol and address to use when connecting.
166     address_t *_addr;
167 
168 #ifdef ZMQ_HAVE_WSS
169     //  TLS handshake, we need to take a copy when the session is created,
170     //  in order to maintain the value at the creation time
171     const std::string _wss_hostname;
172 #endif
173 
174     ZMQ_NON_COPYABLE_NOR_MOVABLE (session_base_t)
175 };
176 
177 class hello_msg_session_t ZMQ_FINAL : public session_base_t
178 {
179   public:
180     hello_msg_session_t (zmq::io_thread_t *io_thread_,
181                          bool connect_,
182                          zmq::socket_base_t *socket_,
183                          const options_t &options_,
184                          address_t *addr_);
185     ~hello_msg_session_t ();
186 
187     //  Overrides of the functions from session_base_t.
188     int pull_msg (msg_t *msg_);
189     void reset ();
190 
191   private:
192     bool _new_pipe;
193 
194     ZMQ_NON_COPYABLE_NOR_MOVABLE (hello_msg_session_t)
195 };
196 }
197 
198 #endif
199