1 /*
2 Copyright (c) 2007-2010 iMatix Corporation
3
4 This file is part of 0MQ.
5
6 0MQ is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 0MQ is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include <string.h>
21
22 #include "zmq_init.hpp"
23 #include "transient_session.hpp"
24 #include "named_session.hpp"
25 #include "socket_base.hpp"
26 #include "zmq_engine.hpp"
27 #include "io_thread.hpp"
28 #include "session.hpp"
29 #include "uuid.hpp"
30 #include "blob.hpp"
31 #include "err.hpp"
32
zmq_init_t(io_thread_t * io_thread_,socket_base_t * socket_,session_t * session_,fd_t fd_,const options_t & options_)33 zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
34 socket_base_t *socket_, session_t *session_, fd_t fd_,
35 const options_t &options_) :
36 own_t (io_thread_, options_),
37 ephemeral_engine (NULL),
38 sent (false),
39 received (false),
40 socket (socket_),
41 session (session_),
42 io_thread (io_thread_)
43 {
44 // Create the engine object for this connection.
45 engine = new (std::nothrow) zmq_engine_t (fd_, options);
46 alloc_assert (engine);
47 }
48
~zmq_init_t()49 zmq::zmq_init_t::~zmq_init_t ()
50 {
51 if (engine)
52 engine->terminate ();
53 }
54
read(::zmq_msg_t * msg_)55 bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
56 {
57 // If the identity was already sent, do nothing.
58 if (sent)
59 return false;
60
61 // Send the identity.
62 int rc = zmq_msg_init_size (msg_, options.identity.size ());
63 zmq_assert (rc == 0);
64 memcpy (zmq_msg_data (msg_), options.identity.c_str (),
65 options.identity.size ());
66 sent = true;
67
68 // Try finalize initialization.
69 finalise_initialisation ();
70
71 return true;
72 }
73
write(::zmq_msg_t * msg_)74 bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
75 {
76 // If identity was already received, we are not interested
77 // in subsequent messages.
78 if (received)
79 return false;
80
81 // Retreieve the remote identity. If it's empty, generate a unique name.
82 if (!zmq_msg_size (msg_)) {
83 unsigned char identity [uuid_t::uuid_blob_len + 1];
84 identity [0] = 0;
85 memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
86 peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
87 }
88 else {
89 peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
90 zmq_msg_size (msg_));
91 }
92 int rc = zmq_msg_close (msg_);
93 zmq_assert (rc == 0);
94
95 received = true;
96
97 // Try finalize initialization.
98 finalise_initialisation ();
99
100 return true;
101 }
102
flush()103 void zmq::zmq_init_t::flush ()
104 {
105 // Check if there's anything to flush.
106 if (!received)
107 return;
108
109 // Initialization is done, dispatch engine.
110 if (ephemeral_engine)
111 dispatch_engine ();
112 }
113
detach()114 void zmq::zmq_init_t::detach ()
115 {
116 // This function is called by engine when disconnection occurs.
117
118 // If there is an associated session, send it a null engine to let it know
119 // that connection process was unsuccesful.
120 if (session)
121 send_attach (session, NULL, blob_t (), true);
122
123 // The engine will destroy itself, so let's just drop the pointer here and
124 // start termination of the init object.
125 engine = NULL;
126 terminate ();
127 }
128
process_plug()129 void zmq::zmq_init_t::process_plug ()
130 {
131 zmq_assert (engine);
132 engine->plug (io_thread, this);
133 }
134
process_unplug()135 void zmq::zmq_init_t::process_unplug ()
136 {
137 if (engine)
138 engine->unplug ();
139 }
140
finalise_initialisation()141 void zmq::zmq_init_t::finalise_initialisation ()
142 {
143 // Unplug and prepare to dispatch engine.
144 if (sent && received) {
145 ephemeral_engine = engine;
146 engine = NULL;
147 ephemeral_engine->unplug ();
148 return;
149 }
150 }
151
dispatch_engine()152 void zmq::zmq_init_t::dispatch_engine ()
153 {
154 if (sent && received) {
155
156 // Engine must be detached.
157 zmq_assert (!engine);
158 zmq_assert (ephemeral_engine);
159
160 // If we know what session we belong to, it's easy, just send the
161 // engine to that session and destroy the init object. Note that we
162 // know about the session only if this object is owned by it. Thus,
163 // lifetime of this object in contained in the lifetime of the session
164 // so the pointer cannot become invalid without notice.
165 if (session) {
166 send_attach (session, ephemeral_engine, peer_identity, true);
167 terminate ();
168 return;
169 }
170
171 // All the cases below are listener-based. Therefore we need the socket
172 // reference so that new sessions can bind to that socket.
173 zmq_assert (socket);
174
175 // We have no associated session. If the peer has no identity we'll
176 // create a transient session for the connection. Note that
177 // seqnum is incremented to account for attach command before the
178 // session is launched. That way we are sure it won't terminate before
179 // being attached.
180 if (peer_identity [0] == 0) {
181 session = new (std::nothrow) transient_session_t (io_thread,
182 socket, options);
183 alloc_assert (session);
184 session->inc_seqnum ();
185 launch_sibling (session);
186 send_attach (session, ephemeral_engine, peer_identity, false);
187 terminate ();
188 return;
189 }
190
191 // Try to find the session corresponding to the peer's identity.
192 // If found, send the engine to that session and destroy this object.
193 // Note that session's seqnum is incremented by find_session rather
194 // than by send_attach.
195 session = socket->find_session (peer_identity);
196 if (session) {
197 send_attach (session, ephemeral_engine, peer_identity, false);
198 terminate ();
199 return;
200 }
201
202 // There's no such named session. We have to create one. Note that
203 // seqnum is incremented to account for attach command before the
204 // session is launched. That way we are sure it won't terminate before
205 // being attached.
206 session = new (std::nothrow) named_session_t (io_thread, socket,
207 options, peer_identity);
208 alloc_assert (session);
209 session->inc_seqnum ();
210 launch_sibling (session);
211 send_attach (session, ephemeral_engine, peer_identity, false);
212 terminate ();
213 return;
214 }
215 }
216