1 /*
2 Copyright (c) 2007-2011 iMatix Corporation
3 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4
5 This file is part of 0MQ.
6
7 0MQ is free software; you can redistribute it and/or modify it under
8 the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
11
12 0MQ is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include <string.h>
22
23 #include "zmq_init.hpp"
24 #include "transient_session.hpp"
25 #include "named_session.hpp"
26 #include "socket_base.hpp"
27 #include "zmq_engine.hpp"
28 #include "io_thread.hpp"
29 #include "session.hpp"
30 #include "uuid.hpp"
31 #include "blob.hpp"
32 #include "err.hpp"
33
zmq_init_t(io_thread_t * io_thread_,socket_base_t * socket_,session_t * session_,fd_t fd_,const options_t & options_)34 zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
35 socket_base_t *socket_, session_t *session_, fd_t fd_,
36 const options_t &options_) :
37 own_t (io_thread_, options_),
38 ephemeral_engine (NULL),
39 sent (false),
40 received (false),
41 socket (socket_),
42 session (session_),
43 io_thread (io_thread_)
44 {
45 // Create the engine object for this connection.
46 engine = new (std::nothrow) zmq_engine_t (fd_, options);
47 alloc_assert (engine);
48 }
49
~zmq_init_t()50 zmq::zmq_init_t::~zmq_init_t ()
51 {
52 if (engine)
53 engine->terminate ();
54 }
55
read(::zmq_msg_t * msg_)56 bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
57 {
58 // If the identity was already sent, do nothing.
59 if (sent)
60 return false;
61
62 // Send the identity.
63 int rc = zmq_msg_init_size (msg_, options.identity.size ());
64 zmq_assert (rc == 0);
65 memcpy (zmq_msg_data (msg_), options.identity.c_str (),
66 options.identity.size ());
67 sent = true;
68
69 // Try finalize initialization.
70 finalise_initialisation ();
71
72 return true;
73 }
74
write(::zmq_msg_t * msg_)75 bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
76 {
77 // If identity was already received, we are not interested
78 // in subsequent messages.
79 if (received)
80 return false;
81
82 // Retreieve the remote identity. If it's empty, generate a unique name.
83 if (!zmq_msg_size (msg_)) {
84 unsigned char identity [uuid_t::uuid_blob_len + 1];
85 identity [0] = 0;
86 memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
87 peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
88 }
89 else {
90 peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
91 zmq_msg_size (msg_));
92 }
93 int rc = zmq_msg_close (msg_);
94 zmq_assert (rc == 0);
95
96 received = true;
97
98 // Try finalize initialization.
99 finalise_initialisation ();
100
101 return true;
102 }
103
flush()104 void zmq::zmq_init_t::flush ()
105 {
106 // Check if there's anything to flush.
107 if (!received)
108 return;
109
110 // Initialization is done, dispatch engine.
111 if (ephemeral_engine)
112 dispatch_engine ();
113 }
114
detach()115 void zmq::zmq_init_t::detach ()
116 {
117 // This function is called by engine when disconnection occurs.
118
119 // If there is an associated session, send it a null engine to let it know
120 // that connection process was unsuccesful.
121 if (session)
122 send_attach (session, NULL, blob_t (), true);
123
124 // The engine will destroy itself, so let's just drop the pointer here and
125 // start termination of the init object.
126 engine = NULL;
127 terminate ();
128 }
129
process_plug()130 void zmq::zmq_init_t::process_plug ()
131 {
132 zmq_assert (engine);
133 engine->plug (io_thread, this);
134 }
135
process_unplug()136 void zmq::zmq_init_t::process_unplug ()
137 {
138 if (engine)
139 engine->unplug ();
140 }
141
finalise_initialisation()142 void zmq::zmq_init_t::finalise_initialisation ()
143 {
144 // Unplug and prepare to dispatch engine.
145 if (sent && received) {
146 ephemeral_engine = engine;
147 engine = NULL;
148 ephemeral_engine->unplug ();
149 return;
150 }
151 }
152
dispatch_engine()153 void zmq::zmq_init_t::dispatch_engine ()
154 {
155 if (sent && received) {
156
157 // Engine must be detached.
158 zmq_assert (!engine);
159 zmq_assert (ephemeral_engine);
160
161 // If we know what session we belong to, it's easy, just send the
162 // engine to that session and destroy the init object. Note that we
163 // know about the session only if this object is owned by it. Thus,
164 // lifetime of this object in contained in the lifetime of the session
165 // so the pointer cannot become invalid without notice.
166 if (session) {
167 send_attach (session, ephemeral_engine, peer_identity, true);
168 terminate ();
169 return;
170 }
171
172 // All the cases below are listener-based. Therefore we need the socket
173 // reference so that new sessions can bind to that socket.
174 zmq_assert (socket);
175
176 // We have no associated session. If the peer has no identity we'll
177 // create a transient session for the connection. Note that
178 // seqnum is incremented to account for attach command before the
179 // session is launched. That way we are sure it won't terminate before
180 // being attached.
181 if (peer_identity [0] == 0) {
182 session = new (std::nothrow) transient_session_t (io_thread,
183 socket, options);
184 alloc_assert (session);
185 session->inc_seqnum ();
186 launch_sibling (session);
187 send_attach (session, ephemeral_engine, peer_identity, false);
188 terminate ();
189 return;
190 }
191
192 // Try to find the session corresponding to the peer's identity.
193 // If found, send the engine to that session and destroy this object.
194 // Note that session's seqnum is incremented by find_session rather
195 // than by send_attach.
196 session = socket->find_session (peer_identity);
197 if (session) {
198 send_attach (session, ephemeral_engine, peer_identity, false);
199 terminate ();
200 return;
201 }
202
203 // There's no such named session. We have to create one. Note that
204 // seqnum is incremented to account for attach command before the
205 // session is launched. That way we are sure it won't terminate before
206 // being attached.
207 session = new (std::nothrow) named_session_t (io_thread, socket,
208 options, peer_identity);
209 alloc_assert (session);
210 session->inc_seqnum ();
211 launch_sibling (session);
212 send_attach (session, ephemeral_engine, peer_identity, false);
213 terminate ();
214 return;
215 }
216 }
217