1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include <string.h>
21 #include <stdarg.h>
22 
23 #include "object.hpp"
24 #include "ctx.hpp"
25 #include "err.hpp"
26 #include "pipe.hpp"
27 #include "io_thread.hpp"
28 #include "session.hpp"
29 #include "socket_base.hpp"
30 
object_t(ctx_t * ctx_,uint32_t tid_)31 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
32     ctx (ctx_),
33     tid (tid_)
34 {
35 }
36 
object_t(object_t * parent_)37 zmq::object_t::object_t (object_t *parent_) :
38     ctx (parent_->ctx),
39     tid (parent_->tid)
40 {
41 }
42 
~object_t()43 zmq::object_t::~object_t ()
44 {
45 }
46 
get_tid()47 uint32_t zmq::object_t::get_tid ()
48 {
49     return tid;
50 }
51 
get_ctx()52 zmq::ctx_t *zmq::object_t::get_ctx ()
53 {
54     return ctx;
55 }
56 
process_command(command_t & cmd_)57 void zmq::object_t::process_command (command_t &cmd_)
58 {
59     switch (cmd_.type) {
60 
61     case command_t::activate_reader:
62         process_activate_reader ();
63         break;
64 
65     case command_t::activate_writer:
66         process_activate_writer (cmd_.args.activate_writer.msgs_read);
67         break;
68 
69     case command_t::stop:
70         process_stop ();
71         break;
72 
73     case command_t::plug:
74         process_plug ();
75         process_seqnum ();
76         return;
77 
78     case command_t::own:
79         process_own (cmd_.args.own.object);
80         process_seqnum ();
81         break;
82 
83     case command_t::attach:
84         process_attach (cmd_.args.attach.engine,
85             cmd_.args.attach.peer_identity ?
86             blob_t (cmd_.args.attach.peer_identity,
87             cmd_.args.attach.peer_identity_size) : blob_t ());
88         process_seqnum ();
89         break;
90 
91     case command_t::bind:
92         process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
93             cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity,
94             cmd_.args.bind.peer_identity_size) : blob_t ());
95         process_seqnum ();
96         break;
97 
98     case command_t::pipe_term:
99         process_pipe_term ();
100         return;
101 
102     case command_t::pipe_term_ack:
103         process_pipe_term_ack ();
104         break;
105 
106     case command_t::term_req:
107         process_term_req (cmd_.args.term_req.object);
108         break;
109 
110     case command_t::term:
111         process_term (cmd_.args.term.linger);
112         break;
113 
114     case command_t::term_ack:
115         process_term_ack ();
116         break;
117 
118     case command_t::reap:
119         process_reap (cmd_.args.reap.socket);
120         break;
121 
122     case command_t::reaped:
123         process_reaped ();
124         break;
125 
126     default:
127         zmq_assert (false);
128     }
129 
130     //  The assumption here is that each command is processed once only,
131     //  so deallocating it after processing is all right.
132     deallocate_command (&cmd_);
133 }
134 
register_endpoint(const char * addr_,endpoint_t & endpoint_)135 int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
136 {
137     return ctx->register_endpoint (addr_, endpoint_);
138 }
139 
unregister_endpoints(socket_base_t * socket_)140 void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
141 {
142     return ctx->unregister_endpoints (socket_);
143 }
144 
find_endpoint(const char * addr_)145 zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
146 {
147     return ctx->find_endpoint (addr_);
148 }
149 
destroy_socket(socket_base_t * socket_)150 void zmq::object_t::destroy_socket (socket_base_t *socket_)
151 {
152     ctx->destroy_socket (socket_);
153 }
154 
log(const char * format_,...)155 void zmq::object_t::log (const char *format_, ...)
156 {
157     va_list args;
158     va_start (args, format_);
159     ctx->log (format_, args);
160     va_end (args);
161 }
162 
choose_io_thread(uint64_t affinity_)163 zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
164 {
165     return ctx->choose_io_thread (affinity_);
166 }
167 
send_stop()168 void zmq::object_t::send_stop ()
169 {
170     //  'stop' command goes always from administrative thread to
171     //  the current object.
172     command_t cmd;
173 #if defined ZMQ_MAKE_VALGRIND_HAPPY
174     memset (&cmd, 0, sizeof (cmd));
175 #endif
176     cmd.destination = this;
177     cmd.type = command_t::stop;
178     ctx->send_command (tid, cmd);
179 }
180 
send_plug(own_t * destination_,bool inc_seqnum_)181 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
182 {
183     if (inc_seqnum_)
184         destination_->inc_seqnum ();
185 
186     command_t cmd;
187 #if defined ZMQ_MAKE_VALGRIND_HAPPY
188     memset (&cmd, 0, sizeof (cmd));
189 #endif
190     cmd.destination = destination_;
191     cmd.type = command_t::plug;
192     send_command (cmd);
193 }
194 
send_own(own_t * destination_,own_t * object_)195 void zmq::object_t::send_own (own_t *destination_, own_t *object_)
196 {
197     destination_->inc_seqnum ();
198     command_t cmd;
199 #if defined ZMQ_MAKE_VALGRIND_HAPPY
200     memset (&cmd, 0, sizeof (cmd));
201 #endif
202     cmd.destination = destination_;
203     cmd.type = command_t::own;
204     cmd.args.own.object = object_;
205     send_command (cmd);
206 }
207 
send_attach(session_t * destination_,i_engine * engine_,const blob_t & peer_identity_,bool inc_seqnum_)208 void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
209     const blob_t &peer_identity_, bool inc_seqnum_)
210 {
211     if (inc_seqnum_)
212         destination_->inc_seqnum ();
213 
214     command_t cmd;
215 #if defined ZMQ_MAKE_VALGRIND_HAPPY
216     memset (&cmd, 0, sizeof (cmd));
217 #endif
218     cmd.destination = destination_;
219     cmd.type = command_t::attach;
220     cmd.args.attach.engine = engine_;
221     if (peer_identity_.empty ()) {
222         cmd.args.attach.peer_identity_size = 0;
223         cmd.args.attach.peer_identity = NULL;
224     }
225     else {
226         zmq_assert (peer_identity_.size () <= 0xff);
227         cmd.args.attach.peer_identity_size =
228             (unsigned char) peer_identity_.size ();
229         cmd.args.attach.peer_identity =
230             (unsigned char*) malloc (peer_identity_.size ());
231         alloc_assert (cmd.args.attach.peer_identity_size);
232         memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
233             peer_identity_.size ());
234     }
235     send_command (cmd);
236 }
237 
send_bind(own_t * destination_,reader_t * in_pipe_,writer_t * out_pipe_,const blob_t & peer_identity_,bool inc_seqnum_)238 void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
239     writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_)
240 {
241     if (inc_seqnum_)
242         destination_->inc_seqnum ();
243 
244     command_t cmd;
245 #if defined ZMQ_MAKE_VALGRIND_HAPPY
246     memset (&cmd, 0, sizeof (cmd));
247 #endif
248     cmd.destination = destination_;
249     cmd.type = command_t::bind;
250     cmd.args.bind.in_pipe = in_pipe_;
251     cmd.args.bind.out_pipe = out_pipe_;
252     if (peer_identity_.empty ()) {
253         cmd.args.bind.peer_identity_size = 0;
254         cmd.args.bind.peer_identity = NULL;
255     }
256     else {
257         zmq_assert (peer_identity_.size () <= 0xff);
258         cmd.args.bind.peer_identity_size =
259             (unsigned char) peer_identity_.size ();
260         cmd.args.bind.peer_identity =
261             (unsigned char*) malloc (peer_identity_.size ());
262         alloc_assert (cmd.args.bind.peer_identity_size);
263         memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
264             peer_identity_.size ());
265     }
266     send_command (cmd);
267 }
268 
send_activate_reader(reader_t * destination_)269 void zmq::object_t::send_activate_reader (reader_t *destination_)
270 {
271     command_t cmd;
272 #if defined ZMQ_MAKE_VALGRIND_HAPPY
273     memset (&cmd, 0, sizeof (cmd));
274 #endif
275     cmd.destination = destination_;
276     cmd.type = command_t::activate_reader;
277     send_command (cmd);
278 }
279 
send_activate_writer(writer_t * destination_,uint64_t msgs_read_)280 void zmq::object_t::send_activate_writer (writer_t *destination_,
281     uint64_t msgs_read_)
282 {
283     command_t cmd;
284 #if defined ZMQ_MAKE_VALGRIND_HAPPY
285     memset (&cmd, 0, sizeof (cmd));
286 #endif
287     cmd.destination = destination_;
288     cmd.type = command_t::activate_writer;
289     cmd.args.activate_writer.msgs_read = msgs_read_;
290     send_command (cmd);
291 }
292 
send_pipe_term(writer_t * destination_)293 void zmq::object_t::send_pipe_term (writer_t *destination_)
294 {
295     command_t cmd;
296 #if defined ZMQ_MAKE_VALGRIND_HAPPY
297     memset (&cmd, 0, sizeof (cmd));
298 #endif
299     cmd.destination = destination_;
300     cmd.type = command_t::pipe_term;
301     send_command (cmd);
302 }
303 
send_pipe_term_ack(reader_t * destination_)304 void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
305 {
306     command_t cmd;
307 #if defined ZMQ_MAKE_VALGRIND_HAPPY
308     memset (&cmd, 0, sizeof (cmd));
309 #endif
310     cmd.destination = destination_;
311     cmd.type = command_t::pipe_term_ack;
312     send_command (cmd);
313 }
314 
send_term_req(own_t * destination_,own_t * object_)315 void zmq::object_t::send_term_req (own_t *destination_,
316     own_t *object_)
317 {
318     command_t cmd;
319 #if defined ZMQ_MAKE_VALGRIND_HAPPY
320     memset (&cmd, 0, sizeof (cmd));
321 #endif
322     cmd.destination = destination_;
323     cmd.type = command_t::term_req;
324     cmd.args.term_req.object = object_;
325     send_command (cmd);
326 }
327 
send_term(own_t * destination_,int linger_)328 void zmq::object_t::send_term (own_t *destination_, int linger_)
329 {
330     command_t cmd;
331 #if defined ZMQ_MAKE_VALGRIND_HAPPY
332     memset (&cmd, 0, sizeof (cmd));
333 #endif
334     cmd.destination = destination_;
335     cmd.type = command_t::term;
336     cmd.args.term.linger = linger_;
337     send_command (cmd);
338 }
339 
send_term_ack(own_t * destination_)340 void zmq::object_t::send_term_ack (own_t *destination_)
341 {
342     command_t cmd;
343 #if defined ZMQ_MAKE_VALGRIND_HAPPY
344     memset (&cmd, 0, sizeof (cmd));
345 #endif
346     cmd.destination = destination_;
347     cmd.type = command_t::term_ack;
348     send_command (cmd);
349 }
350 
send_reap(class socket_base_t * socket_)351 void zmq::object_t::send_reap (class socket_base_t *socket_)
352 {
353     command_t cmd;
354 #if defined ZMQ_MAKE_VALGRIND_HAPPY
355     memset (&cmd, 0, sizeof (cmd));
356 #endif
357     cmd.destination = ctx->get_reaper ();
358     cmd.type = command_t::reap;
359     cmd.args.reap.socket = socket_;
360     send_command (cmd);
361 }
362 
send_reaped()363 void zmq::object_t::send_reaped ()
364 {
365     command_t cmd;
366 #if defined ZMQ_MAKE_VALGRIND_HAPPY
367     memset (&cmd, 0, sizeof (cmd));
368 #endif
369     cmd.destination = ctx->get_reaper ();
370     cmd.type = command_t::reaped;
371     send_command (cmd);
372 }
373 
send_done()374 void zmq::object_t::send_done ()
375 {
376     command_t cmd;
377 #if defined ZMQ_MAKE_VALGRIND_HAPPY
378     memset (&cmd, 0, sizeof (cmd));
379 #endif
380     cmd.destination = NULL;
381     cmd.type = command_t::done;
382     ctx->send_command (ctx_t::term_tid, cmd);
383 }
384 
process_stop()385 void zmq::object_t::process_stop ()
386 {
387     zmq_assert (false);
388 }
389 
process_plug()390 void zmq::object_t::process_plug ()
391 {
392     zmq_assert (false);
393 }
394 
process_own(own_t * object_)395 void zmq::object_t::process_own (own_t *object_)
396 {
397     zmq_assert (false);
398 }
399 
process_attach(i_engine * engine_,const blob_t & peer_identity_)400 void zmq::object_t::process_attach (i_engine *engine_,
401     const blob_t &peer_identity_)
402 {
403     zmq_assert (false);
404 }
405 
process_bind(reader_t * in_pipe_,writer_t * out_pipe_,const blob_t & peer_identity_)406 void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
407     const blob_t &peer_identity_)
408 {
409     zmq_assert (false);
410 }
411 
process_activate_reader()412 void zmq::object_t::process_activate_reader ()
413 {
414     zmq_assert (false);
415 }
416 
process_activate_writer(uint64_t msgs_read_)417 void zmq::object_t::process_activate_writer (uint64_t msgs_read_)
418 {
419     zmq_assert (false);
420 }
421 
process_pipe_term()422 void zmq::object_t::process_pipe_term ()
423 {
424     zmq_assert (false);
425 }
426 
process_pipe_term_ack()427 void zmq::object_t::process_pipe_term_ack ()
428 {
429     zmq_assert (false);
430 }
431 
process_term_req(own_t * object_)432 void zmq::object_t::process_term_req (own_t *object_)
433 {
434     zmq_assert (false);
435 }
436 
process_term(int linger_)437 void zmq::object_t::process_term (int linger_)
438 {
439     zmq_assert (false);
440 }
441 
process_term_ack()442 void zmq::object_t::process_term_ack ()
443 {
444     zmq_assert (false);
445 }
446 
process_reap(class socket_base_t * socket_)447 void zmq::object_t::process_reap (class socket_base_t *socket_)
448 {
449     zmq_assert (false);
450 }
451 
process_reaped()452 void zmq::object_t::process_reaped ()
453 {
454     zmq_assert (false);
455 }
456 
process_seqnum()457 void zmq::object_t::process_seqnum ()
458 {
459     zmq_assert (false);
460 }
461 
send_command(command_t & cmd_)462 void zmq::object_t::send_command (command_t &cmd_)
463 {
464     ctx->send_command (cmd_.destination->get_tid (), cmd_);
465 }
466 
467