1 /*
2 Copyright (c) 2007-2010 iMatix Corporation
3
4 This file is part of 0MQ.
5
6 0MQ is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 0MQ is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include <string.h>
21 #include <stdarg.h>
22
23 #include "object.hpp"
24 #include "ctx.hpp"
25 #include "err.hpp"
26 #include "pipe.hpp"
27 #include "io_thread.hpp"
28 #include "session.hpp"
29 #include "socket_base.hpp"
30
object_t(ctx_t * ctx_,uint32_t tid_)31 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
32 ctx (ctx_),
33 tid (tid_)
34 {
35 }
36
object_t(object_t * parent_)37 zmq::object_t::object_t (object_t *parent_) :
38 ctx (parent_->ctx),
39 tid (parent_->tid)
40 {
41 }
42
~object_t()43 zmq::object_t::~object_t ()
44 {
45 }
46
get_tid()47 uint32_t zmq::object_t::get_tid ()
48 {
49 return tid;
50 }
51
get_ctx()52 zmq::ctx_t *zmq::object_t::get_ctx ()
53 {
54 return ctx;
55 }
56
process_command(command_t & cmd_)57 void zmq::object_t::process_command (command_t &cmd_)
58 {
59 switch (cmd_.type) {
60
61 case command_t::activate_reader:
62 process_activate_reader ();
63 break;
64
65 case command_t::activate_writer:
66 process_activate_writer (cmd_.args.activate_writer.msgs_read);
67 break;
68
69 case command_t::stop:
70 process_stop ();
71 break;
72
73 case command_t::plug:
74 process_plug ();
75 process_seqnum ();
76 return;
77
78 case command_t::own:
79 process_own (cmd_.args.own.object);
80 process_seqnum ();
81 break;
82
83 case command_t::attach:
84 process_attach (cmd_.args.attach.engine,
85 cmd_.args.attach.peer_identity ?
86 blob_t (cmd_.args.attach.peer_identity,
87 cmd_.args.attach.peer_identity_size) : blob_t ());
88 process_seqnum ();
89 break;
90
91 case command_t::bind:
92 process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
93 cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity,
94 cmd_.args.bind.peer_identity_size) : blob_t ());
95 process_seqnum ();
96 break;
97
98 case command_t::pipe_term:
99 process_pipe_term ();
100 return;
101
102 case command_t::pipe_term_ack:
103 process_pipe_term_ack ();
104 break;
105
106 case command_t::term_req:
107 process_term_req (cmd_.args.term_req.object);
108 break;
109
110 case command_t::term:
111 process_term (cmd_.args.term.linger);
112 break;
113
114 case command_t::term_ack:
115 process_term_ack ();
116 break;
117
118 case command_t::reap:
119 process_reap (cmd_.args.reap.socket);
120 break;
121
122 case command_t::reaped:
123 process_reaped ();
124 break;
125
126 default:
127 zmq_assert (false);
128 }
129
130 // The assumption here is that each command is processed once only,
131 // so deallocating it after processing is all right.
132 deallocate_command (&cmd_);
133 }
134
register_endpoint(const char * addr_,endpoint_t & endpoint_)135 int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
136 {
137 return ctx->register_endpoint (addr_, endpoint_);
138 }
139
unregister_endpoints(socket_base_t * socket_)140 void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
141 {
142 return ctx->unregister_endpoints (socket_);
143 }
144
find_endpoint(const char * addr_)145 zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
146 {
147 return ctx->find_endpoint (addr_);
148 }
149
destroy_socket(socket_base_t * socket_)150 void zmq::object_t::destroy_socket (socket_base_t *socket_)
151 {
152 ctx->destroy_socket (socket_);
153 }
154
log(const char * format_,...)155 void zmq::object_t::log (const char *format_, ...)
156 {
157 va_list args;
158 va_start (args, format_);
159 ctx->log (format_, args);
160 va_end (args);
161 }
162
choose_io_thread(uint64_t affinity_)163 zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
164 {
165 return ctx->choose_io_thread (affinity_);
166 }
167
send_stop()168 void zmq::object_t::send_stop ()
169 {
170 // 'stop' command goes always from administrative thread to
171 // the current object.
172 command_t cmd;
173 #if defined ZMQ_MAKE_VALGRIND_HAPPY
174 memset (&cmd, 0, sizeof (cmd));
175 #endif
176 cmd.destination = this;
177 cmd.type = command_t::stop;
178 ctx->send_command (tid, cmd);
179 }
180
send_plug(own_t * destination_,bool inc_seqnum_)181 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
182 {
183 if (inc_seqnum_)
184 destination_->inc_seqnum ();
185
186 command_t cmd;
187 #if defined ZMQ_MAKE_VALGRIND_HAPPY
188 memset (&cmd, 0, sizeof (cmd));
189 #endif
190 cmd.destination = destination_;
191 cmd.type = command_t::plug;
192 send_command (cmd);
193 }
194
send_own(own_t * destination_,own_t * object_)195 void zmq::object_t::send_own (own_t *destination_, own_t *object_)
196 {
197 destination_->inc_seqnum ();
198 command_t cmd;
199 #if defined ZMQ_MAKE_VALGRIND_HAPPY
200 memset (&cmd, 0, sizeof (cmd));
201 #endif
202 cmd.destination = destination_;
203 cmd.type = command_t::own;
204 cmd.args.own.object = object_;
205 send_command (cmd);
206 }
207
send_attach(session_t * destination_,i_engine * engine_,const blob_t & peer_identity_,bool inc_seqnum_)208 void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
209 const blob_t &peer_identity_, bool inc_seqnum_)
210 {
211 if (inc_seqnum_)
212 destination_->inc_seqnum ();
213
214 command_t cmd;
215 #if defined ZMQ_MAKE_VALGRIND_HAPPY
216 memset (&cmd, 0, sizeof (cmd));
217 #endif
218 cmd.destination = destination_;
219 cmd.type = command_t::attach;
220 cmd.args.attach.engine = engine_;
221 if (peer_identity_.empty ()) {
222 cmd.args.attach.peer_identity_size = 0;
223 cmd.args.attach.peer_identity = NULL;
224 }
225 else {
226 zmq_assert (peer_identity_.size () <= 0xff);
227 cmd.args.attach.peer_identity_size =
228 (unsigned char) peer_identity_.size ();
229 cmd.args.attach.peer_identity =
230 (unsigned char*) malloc (peer_identity_.size ());
231 alloc_assert (cmd.args.attach.peer_identity_size);
232 memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
233 peer_identity_.size ());
234 }
235 send_command (cmd);
236 }
237
send_bind(own_t * destination_,reader_t * in_pipe_,writer_t * out_pipe_,const blob_t & peer_identity_,bool inc_seqnum_)238 void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
239 writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_)
240 {
241 if (inc_seqnum_)
242 destination_->inc_seqnum ();
243
244 command_t cmd;
245 #if defined ZMQ_MAKE_VALGRIND_HAPPY
246 memset (&cmd, 0, sizeof (cmd));
247 #endif
248 cmd.destination = destination_;
249 cmd.type = command_t::bind;
250 cmd.args.bind.in_pipe = in_pipe_;
251 cmd.args.bind.out_pipe = out_pipe_;
252 if (peer_identity_.empty ()) {
253 cmd.args.bind.peer_identity_size = 0;
254 cmd.args.bind.peer_identity = NULL;
255 }
256 else {
257 zmq_assert (peer_identity_.size () <= 0xff);
258 cmd.args.bind.peer_identity_size =
259 (unsigned char) peer_identity_.size ();
260 cmd.args.bind.peer_identity =
261 (unsigned char*) malloc (peer_identity_.size ());
262 alloc_assert (cmd.args.bind.peer_identity_size);
263 memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
264 peer_identity_.size ());
265 }
266 send_command (cmd);
267 }
268
send_activate_reader(reader_t * destination_)269 void zmq::object_t::send_activate_reader (reader_t *destination_)
270 {
271 command_t cmd;
272 #if defined ZMQ_MAKE_VALGRIND_HAPPY
273 memset (&cmd, 0, sizeof (cmd));
274 #endif
275 cmd.destination = destination_;
276 cmd.type = command_t::activate_reader;
277 send_command (cmd);
278 }
279
send_activate_writer(writer_t * destination_,uint64_t msgs_read_)280 void zmq::object_t::send_activate_writer (writer_t *destination_,
281 uint64_t msgs_read_)
282 {
283 command_t cmd;
284 #if defined ZMQ_MAKE_VALGRIND_HAPPY
285 memset (&cmd, 0, sizeof (cmd));
286 #endif
287 cmd.destination = destination_;
288 cmd.type = command_t::activate_writer;
289 cmd.args.activate_writer.msgs_read = msgs_read_;
290 send_command (cmd);
291 }
292
send_pipe_term(writer_t * destination_)293 void zmq::object_t::send_pipe_term (writer_t *destination_)
294 {
295 command_t cmd;
296 #if defined ZMQ_MAKE_VALGRIND_HAPPY
297 memset (&cmd, 0, sizeof (cmd));
298 #endif
299 cmd.destination = destination_;
300 cmd.type = command_t::pipe_term;
301 send_command (cmd);
302 }
303
send_pipe_term_ack(reader_t * destination_)304 void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
305 {
306 command_t cmd;
307 #if defined ZMQ_MAKE_VALGRIND_HAPPY
308 memset (&cmd, 0, sizeof (cmd));
309 #endif
310 cmd.destination = destination_;
311 cmd.type = command_t::pipe_term_ack;
312 send_command (cmd);
313 }
314
send_term_req(own_t * destination_,own_t * object_)315 void zmq::object_t::send_term_req (own_t *destination_,
316 own_t *object_)
317 {
318 command_t cmd;
319 #if defined ZMQ_MAKE_VALGRIND_HAPPY
320 memset (&cmd, 0, sizeof (cmd));
321 #endif
322 cmd.destination = destination_;
323 cmd.type = command_t::term_req;
324 cmd.args.term_req.object = object_;
325 send_command (cmd);
326 }
327
send_term(own_t * destination_,int linger_)328 void zmq::object_t::send_term (own_t *destination_, int linger_)
329 {
330 command_t cmd;
331 #if defined ZMQ_MAKE_VALGRIND_HAPPY
332 memset (&cmd, 0, sizeof (cmd));
333 #endif
334 cmd.destination = destination_;
335 cmd.type = command_t::term;
336 cmd.args.term.linger = linger_;
337 send_command (cmd);
338 }
339
send_term_ack(own_t * destination_)340 void zmq::object_t::send_term_ack (own_t *destination_)
341 {
342 command_t cmd;
343 #if defined ZMQ_MAKE_VALGRIND_HAPPY
344 memset (&cmd, 0, sizeof (cmd));
345 #endif
346 cmd.destination = destination_;
347 cmd.type = command_t::term_ack;
348 send_command (cmd);
349 }
350
send_reap(class socket_base_t * socket_)351 void zmq::object_t::send_reap (class socket_base_t *socket_)
352 {
353 command_t cmd;
354 #if defined ZMQ_MAKE_VALGRIND_HAPPY
355 memset (&cmd, 0, sizeof (cmd));
356 #endif
357 cmd.destination = ctx->get_reaper ();
358 cmd.type = command_t::reap;
359 cmd.args.reap.socket = socket_;
360 send_command (cmd);
361 }
362
send_reaped()363 void zmq::object_t::send_reaped ()
364 {
365 command_t cmd;
366 #if defined ZMQ_MAKE_VALGRIND_HAPPY
367 memset (&cmd, 0, sizeof (cmd));
368 #endif
369 cmd.destination = ctx->get_reaper ();
370 cmd.type = command_t::reaped;
371 send_command (cmd);
372 }
373
send_done()374 void zmq::object_t::send_done ()
375 {
376 command_t cmd;
377 #if defined ZMQ_MAKE_VALGRIND_HAPPY
378 memset (&cmd, 0, sizeof (cmd));
379 #endif
380 cmd.destination = NULL;
381 cmd.type = command_t::done;
382 ctx->send_command (ctx_t::term_tid, cmd);
383 }
384
process_stop()385 void zmq::object_t::process_stop ()
386 {
387 zmq_assert (false);
388 }
389
process_plug()390 void zmq::object_t::process_plug ()
391 {
392 zmq_assert (false);
393 }
394
process_own(own_t * object_)395 void zmq::object_t::process_own (own_t *object_)
396 {
397 zmq_assert (false);
398 }
399
process_attach(i_engine * engine_,const blob_t & peer_identity_)400 void zmq::object_t::process_attach (i_engine *engine_,
401 const blob_t &peer_identity_)
402 {
403 zmq_assert (false);
404 }
405
process_bind(reader_t * in_pipe_,writer_t * out_pipe_,const blob_t & peer_identity_)406 void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
407 const blob_t &peer_identity_)
408 {
409 zmq_assert (false);
410 }
411
process_activate_reader()412 void zmq::object_t::process_activate_reader ()
413 {
414 zmq_assert (false);
415 }
416
process_activate_writer(uint64_t msgs_read_)417 void zmq::object_t::process_activate_writer (uint64_t msgs_read_)
418 {
419 zmq_assert (false);
420 }
421
process_pipe_term()422 void zmq::object_t::process_pipe_term ()
423 {
424 zmq_assert (false);
425 }
426
process_pipe_term_ack()427 void zmq::object_t::process_pipe_term_ack ()
428 {
429 zmq_assert (false);
430 }
431
process_term_req(own_t * object_)432 void zmq::object_t::process_term_req (own_t *object_)
433 {
434 zmq_assert (false);
435 }
436
process_term(int linger_)437 void zmq::object_t::process_term (int linger_)
438 {
439 zmq_assert (false);
440 }
441
process_term_ack()442 void zmq::object_t::process_term_ack ()
443 {
444 zmq_assert (false);
445 }
446
process_reap(class socket_base_t * socket_)447 void zmq::object_t::process_reap (class socket_base_t *socket_)
448 {
449 zmq_assert (false);
450 }
451
process_reaped()452 void zmq::object_t::process_reaped ()
453 {
454 zmq_assert (false);
455 }
456
process_seqnum()457 void zmq::object_t::process_seqnum ()
458 {
459 zmq_assert (false);
460 }
461
send_command(command_t & cmd_)462 void zmq::object_t::send_command (command_t &cmd_)
463 {
464 ctx->send_command (cmd_.destination->get_tid (), cmd_);
465 }
466
467