1 /*
2     Copyright (c) 2007-2011 iMatix Corporation
3     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 
5     This file is part of 0MQ.
6 
7     0MQ is free software; you can redistribute it and/or modify it under
8     the terms of the GNU Lesser General Public License as published by
9     the Free Software Foundation; either version 3 of the License, or
10     (at your option) any later version.
11 
12     0MQ is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU Lesser General Public License for more details.
16 
17     You should have received a copy of the GNU Lesser General Public License
18     along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include <string.h>
22 
23 #include "zmq_init.hpp"
24 #include "transient_session.hpp"
25 #include "named_session.hpp"
26 #include "socket_base.hpp"
27 #include "zmq_engine.hpp"
28 #include "io_thread.hpp"
29 #include "session.hpp"
30 #include "uuid.hpp"
31 #include "blob.hpp"
32 #include "err.hpp"
33 
zmq_init_t(io_thread_t * io_thread_,socket_base_t * socket_,session_t * session_,fd_t fd_,const options_t & options_)34 zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
35       socket_base_t *socket_, session_t *session_, fd_t fd_,
36       const options_t &options_) :
37     own_t (io_thread_, options_),
38     ephemeral_engine (NULL),
39     sent (false),
40     received (false),
41     socket (socket_),
42     session (session_),
43     io_thread (io_thread_)
44 {
45     //  Create the engine object for this connection.
46     engine = new (std::nothrow) zmq_engine_t (fd_, options);
47     alloc_assert (engine);
48 }
49 
~zmq_init_t()50 zmq::zmq_init_t::~zmq_init_t ()
51 {
52     if (engine)
53         engine->terminate ();
54 }
55 
read(::zmq_msg_t * msg_)56 bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
57 {
58     //  If the identity was already sent, do nothing.
59     if (sent)
60         return false;
61 
62     //  Send the identity.
63     int rc = zmq_msg_init_size (msg_, options.identity.size ());
64     zmq_assert (rc == 0);
65     memcpy (zmq_msg_data (msg_), options.identity.c_str (),
66         options.identity.size ());
67     sent = true;
68 
69     //  Try finalize initialization.
70     finalise_initialisation ();
71 
72     return true;
73 }
74 
write(::zmq_msg_t * msg_)75 bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
76 {
77     //  If identity was already received, we are not interested
78     //  in subsequent messages.
79     if (received)
80         return false;
81 
82     //  Retreieve the remote identity. If it's empty, generate a unique name.
83     if (!zmq_msg_size (msg_)) {
84         unsigned char identity [uuid_t::uuid_blob_len + 1];
85         identity [0] = 0;
86         memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
87         peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
88     }
89     else {
90         peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
91             zmq_msg_size (msg_));
92     }
93     int rc = zmq_msg_close (msg_);
94     zmq_assert (rc == 0);
95 
96     received = true;
97 
98     //  Try finalize initialization.
99     finalise_initialisation ();
100 
101     return true;
102 }
103 
flush()104 void zmq::zmq_init_t::flush ()
105 {
106     //  Check if there's anything to flush.
107     if (!received)
108         return;
109 
110     //  Initialization is done, dispatch engine.
111     if (ephemeral_engine)
112         dispatch_engine ();
113 }
114 
detach()115 void zmq::zmq_init_t::detach ()
116 {
117     //  This function is called by engine when disconnection occurs.
118 
119     //  If there is an associated session, send it a null engine to let it know
120     //  that connection process was unsuccesful.
121     if (session)
122         send_attach (session, NULL, blob_t (), true);
123 
124     //  The engine will destroy itself, so let's just drop the pointer here and
125     //  start termination of the init object.
126     engine = NULL;
127     terminate ();
128 }
129 
process_plug()130 void zmq::zmq_init_t::process_plug ()
131 {
132     zmq_assert (engine);
133     engine->plug (io_thread, this);
134 }
135 
process_unplug()136 void zmq::zmq_init_t::process_unplug ()
137 {
138     if (engine)
139         engine->unplug ();
140 }
141 
finalise_initialisation()142 void zmq::zmq_init_t::finalise_initialisation ()
143 {
144      //  Unplug and prepare to dispatch engine.
145      if (sent && received) {
146         ephemeral_engine = engine;
147         engine = NULL;
148         ephemeral_engine->unplug ();
149         return;
150     }
151 }
152 
dispatch_engine()153 void zmq::zmq_init_t::dispatch_engine ()
154 {
155     if (sent && received) {
156 
157         //  Engine must be detached.
158         zmq_assert (!engine);
159         zmq_assert (ephemeral_engine);
160 
161         //  If we know what session we belong to, it's easy, just send the
162         //  engine to that session and destroy the init object. Note that we
163         //  know about the session only if this object is owned by it. Thus,
164         //  lifetime of this object in contained in the lifetime of the session
165         //  so the pointer cannot become invalid without notice.
166         if (session) {
167             send_attach (session, ephemeral_engine, peer_identity, true);
168             terminate ();
169             return;
170         }
171 
172         //  All the cases below are listener-based. Therefore we need the socket
173         //  reference so that new sessions can bind to that socket.
174         zmq_assert (socket);
175 
176         //  We have no associated session. If the peer has no identity we'll
177         //  create a transient session for the connection. Note that
178         //  seqnum is incremented to account for attach command before the
179         //  session is launched. That way we are sure it won't terminate before
180         //  being attached.
181         if (peer_identity [0] == 0) {
182             session = new (std::nothrow) transient_session_t (io_thread,
183                 socket, options);
184             alloc_assert (session);
185             session->inc_seqnum ();
186             launch_sibling (session);
187             send_attach (session, ephemeral_engine, peer_identity, false);
188             terminate ();
189             return;
190         }
191 
192         //  Try to find the session corresponding to the peer's identity.
193         //  If found, send the engine to that session and destroy this object.
194         //  Note that session's seqnum is incremented by find_session rather
195         //  than by send_attach.
196         session = socket->find_session (peer_identity);
197         if (session) {
198             send_attach (session, ephemeral_engine, peer_identity, false);
199             terminate ();
200             return;
201         }
202 
203         //  There's no such named session. We have to create one. Note that
204         //  seqnum is incremented to account for attach command before the
205         //  session is launched. That way we are sure it won't terminate before
206         //  being attached.
207         session = new (std::nothrow) named_session_t (io_thread, socket,
208             options, peer_identity);
209         alloc_assert (session);
210         session->inc_seqnum ();
211         launch_sibling (session);
212         send_attach (session, ephemeral_engine, peer_identity, false);
213         terminate ();
214         return;
215     }
216 }
217