1 /* 2 Copyright (c) 2007-2010 iMatix Corporation 3 4 This file is part of 0MQ. 5 6 0MQ is free software; you can redistribute it and/or modify it under 7 the terms of the GNU Lesser General Public License as published by 8 the Free Software Foundation; either version 3 of the License, or 9 (at your option) any later version. 10 11 0MQ is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU Lesser General Public License for more details. 15 16 You should have received a copy of the GNU Lesser General Public License 17 along with this program. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #ifndef __ZMQ_OBJECT_HPP_INCLUDED__ 21 #define __ZMQ_OBJECT_HPP_INCLUDED__ 22 23 #include "../include/zmq.h" 24 25 #include "stdint.hpp" 26 #include "blob.hpp" 27 28 namespace zmq 29 { 30 // Base class for all objects that participate in inter-thread 31 // communication. 32 33 class object_t 34 { 35 public: 36 37 object_t (class ctx_t *ctx_, uint32_t tid_); 38 object_t (object_t *parent_); 39 virtual ~object_t (); 40 41 uint32_t get_tid (); 42 ctx_t *get_ctx (); 43 void process_command (struct command_t &cmd_); 44 45 protected: 46 47 // Using following function, socket is able to access global 48 // repository of inproc endpoints. 49 int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); 50 void unregister_endpoints (class socket_base_t *socket_); 51 struct endpoint_t find_endpoint (const char *addr_); 52 void destroy_socket (class socket_base_t *socket_); 53 54 // Logs an message. 55 void log (const char *format_, ...); 56 57 // Chooses least loaded I/O thread. 58 class io_thread_t *choose_io_thread (uint64_t affinity_); 59 60 // Derived object can use these functions to send commands 61 // to other objects. 62 void send_stop (); 63 void send_plug (class own_t *destination_, 64 bool inc_seqnum_ = true); 65 void send_own (class own_t *destination_, 66 class own_t *object_); 67 void send_attach (class session_t *destination_, 68 struct i_engine *engine_, const blob_t &peer_identity_, 69 bool inc_seqnum_ = true); 70 void send_bind (class own_t *destination_, 71 class reader_t *in_pipe_, class writer_t *out_pipe_, 72 const blob_t &peer_identity_, bool inc_seqnum_ = true); 73 void send_activate_reader (class reader_t *destination_); 74 void send_activate_writer (class writer_t *destination_, 75 uint64_t msgs_read_); 76 void send_pipe_term (class writer_t *destination_); 77 void send_pipe_term_ack (class reader_t *destination_); 78 void send_term_req (class own_t *destination_, 79 class own_t *object_); 80 void send_term (class own_t *destination_, int linger_); 81 void send_term_ack (class own_t *destination_); 82 void send_reap (class socket_base_t *socket_); 83 void send_reaped (); 84 void send_done (); 85 86 // These handlers can be overloaded by the derived objects. They are 87 // called when command arrives from another thread. 88 virtual void process_stop (); 89 virtual void process_plug (); 90 virtual void process_own (class own_t *object_); 91 virtual void process_attach (struct i_engine *engine_, 92 const blob_t &peer_identity_); 93 virtual void process_bind (class reader_t *in_pipe_, 94 class writer_t *out_pipe_, const blob_t &peer_identity_); 95 virtual void process_activate_reader (); 96 virtual void process_activate_writer (uint64_t msgs_read_); 97 virtual void process_pipe_term (); 98 virtual void process_pipe_term_ack (); 99 virtual void process_term_req (class own_t *object_); 100 virtual void process_term (int linger_); 101 virtual void process_term_ack (); 102 virtual void process_reap (class socket_base_t *socket_); 103 virtual void process_reaped (); 104 105 // Special handler called after a command that requires a seqnum 106 // was processed. The implementation should catch up with its counter 107 // of processed commands here. 108 virtual void process_seqnum (); 109 110 private: 111 112 // Context provides access to the global state. 113 class ctx_t *ctx; 114 115 // Thread ID of the thread the object belongs to. 116 uint32_t tid; 117 118 void send_command (command_t &cmd_); 119 120 object_t (const object_t&); 121 const object_t &operator = (const object_t&); 122 }; 123 124 } 125 126 #endif 127