1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include <string.h>
21 
22 #include "zmq_init.hpp"
23 #include "transient_session.hpp"
24 #include "named_session.hpp"
25 #include "socket_base.hpp"
26 #include "zmq_engine.hpp"
27 #include "io_thread.hpp"
28 #include "session.hpp"
29 #include "uuid.hpp"
30 #include "blob.hpp"
31 #include "err.hpp"
32 
zmq_init_t(io_thread_t * io_thread_,socket_base_t * socket_,session_t * session_,fd_t fd_,const options_t & options_)33 zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
34       socket_base_t *socket_, session_t *session_, fd_t fd_,
35       const options_t &options_) :
36     own_t (io_thread_, options_),
37     ephemeral_engine (NULL),
38     sent (false),
39     received (false),
40     socket (socket_),
41     session (session_),
42     io_thread (io_thread_)
43 {
44     //  Create the engine object for this connection.
45     engine = new (std::nothrow) zmq_engine_t (fd_, options);
46     alloc_assert (engine);
47 }
48 
~zmq_init_t()49 zmq::zmq_init_t::~zmq_init_t ()
50 {
51     if (engine)
52         engine->terminate ();
53 }
54 
read(::zmq_msg_t * msg_)55 bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
56 {
57     //  If the identity was already sent, do nothing.
58     if (sent)
59         return false;
60 
61     //  Send the identity.
62     int rc = zmq_msg_init_size (msg_, options.identity.size ());
63     zmq_assert (rc == 0);
64     memcpy (zmq_msg_data (msg_), options.identity.c_str (),
65         options.identity.size ());
66     sent = true;
67 
68     //  Try finalize initialization.
69     finalise_initialisation ();
70 
71     return true;
72 }
73 
write(::zmq_msg_t * msg_)74 bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
75 {
76     //  If identity was already received, we are not interested
77     //  in subsequent messages.
78     if (received)
79         return false;
80 
81     //  Retreieve the remote identity. If it's empty, generate a unique name.
82     if (!zmq_msg_size (msg_)) {
83         unsigned char identity [uuid_t::uuid_blob_len + 1];
84         identity [0] = 0;
85         memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
86         peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
87     }
88     else {
89         peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
90             zmq_msg_size (msg_));
91     }
92     int rc = zmq_msg_close (msg_);
93     zmq_assert (rc == 0);
94 
95     received = true;
96 
97     //  Try finalize initialization.
98     finalise_initialisation ();
99 
100     return true;
101 }
102 
flush()103 void zmq::zmq_init_t::flush ()
104 {
105     //  Check if there's anything to flush.
106     if (!received)
107         return;
108 
109     //  Initialization is done, dispatch engine.
110     if (ephemeral_engine)
111         dispatch_engine ();
112 }
113 
detach()114 void zmq::zmq_init_t::detach ()
115 {
116     //  This function is called by engine when disconnection occurs.
117 
118     //  If there is an associated session, send it a null engine to let it know
119     //  that connection process was unsuccesful.
120     if (session)
121         send_attach (session, NULL, blob_t (), true);
122 
123     //  The engine will destroy itself, so let's just drop the pointer here and
124     //  start termination of the init object.
125     engine = NULL;
126     terminate ();
127 }
128 
process_plug()129 void zmq::zmq_init_t::process_plug ()
130 {
131     zmq_assert (engine);
132     engine->plug (io_thread, this);
133 }
134 
process_unplug()135 void zmq::zmq_init_t::process_unplug ()
136 {
137     if (engine)
138         engine->unplug ();
139 }
140 
finalise_initialisation()141 void zmq::zmq_init_t::finalise_initialisation ()
142 {
143      //  Unplug and prepare to dispatch engine.
144      if (sent && received) {
145         ephemeral_engine = engine;
146         engine = NULL;
147         ephemeral_engine->unplug ();
148         return;
149     }
150 }
151 
dispatch_engine()152 void zmq::zmq_init_t::dispatch_engine ()
153 {
154     if (sent && received) {
155 
156         //  Engine must be detached.
157         zmq_assert (!engine);
158         zmq_assert (ephemeral_engine);
159 
160         //  If we know what session we belong to, it's easy, just send the
161         //  engine to that session and destroy the init object. Note that we
162         //  know about the session only if this object is owned by it. Thus,
163         //  lifetime of this object in contained in the lifetime of the session
164         //  so the pointer cannot become invalid without notice.
165         if (session) {
166             send_attach (session, ephemeral_engine, peer_identity, true);
167             terminate ();
168             return;
169         }
170 
171         //  All the cases below are listener-based. Therefore we need the socket
172         //  reference so that new sessions can bind to that socket.
173         zmq_assert (socket);
174 
175         //  We have no associated session. If the peer has no identity we'll
176         //  create a transient session for the connection. Note that
177         //  seqnum is incremented to account for attach command before the
178         //  session is launched. That way we are sure it won't terminate before
179         //  being attached.
180         if (peer_identity [0] == 0) {
181             session = new (std::nothrow) transient_session_t (io_thread,
182                 socket, options);
183             alloc_assert (session);
184             session->inc_seqnum ();
185             launch_sibling (session);
186             send_attach (session, ephemeral_engine, peer_identity, false);
187             terminate ();
188             return;
189         }
190 
191         //  Try to find the session corresponding to the peer's identity.
192         //  If found, send the engine to that session and destroy this object.
193         //  Note that session's seqnum is incremented by find_session rather
194         //  than by send_attach.
195         session = socket->find_session (peer_identity);
196         if (session) {
197             send_attach (session, ephemeral_engine, peer_identity, false);
198             terminate ();
199             return;
200         }
201 
202         //  There's no such named session. We have to create one. Note that
203         //  seqnum is incremented to account for attach command before the
204         //  session is launched. That way we are sure it won't terminate before
205         //  being attached.
206         session = new (std::nothrow) named_session_t (io_thread, socket,
207             options, peer_identity);
208         alloc_assert (session);
209         session->inc_seqnum ();
210         launch_sibling (session);
211         send_attach (session, ephemeral_engine, peer_identity, false);
212         terminate ();
213         return;
214     }
215 }
216