1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #ifndef __ZMQ_OBJECT_HPP_INCLUDED__
21 #define __ZMQ_OBJECT_HPP_INCLUDED__
22 
23 #include "../include/zmq.h"
24 
25 #include "stdint.hpp"
26 #include "blob.hpp"
27 
28 namespace zmq
29 {
30     //  Base class for all objects that participate in inter-thread
31     //  communication.
32 
33     class object_t
34     {
35     public:
36 
37         object_t (class ctx_t *ctx_, uint32_t tid_);
38         object_t (object_t *parent_);
39         virtual ~object_t ();
40 
41         uint32_t get_tid ();
42         ctx_t *get_ctx ();
43         void process_command (struct command_t &cmd_);
44 
45     protected:
46 
47         //  Using following function, socket is able to access global
48         //  repository of inproc endpoints.
49         int register_endpoint (const char *addr_, struct endpoint_t &endpoint_);
50         void unregister_endpoints (class socket_base_t *socket_);
51         struct endpoint_t find_endpoint (const char *addr_);
52         void destroy_socket (class socket_base_t *socket_);
53 
54         //  Logs an message.
55         void log (const char *format_, ...);
56 
57         //  Chooses least loaded I/O thread.
58         class io_thread_t *choose_io_thread (uint64_t affinity_);
59 
60         //  Derived object can use these functions to send commands
61         //  to other objects.
62         void send_stop ();
63         void send_plug (class own_t *destination_,
64             bool inc_seqnum_ = true);
65         void send_own (class own_t *destination_,
66             class own_t *object_);
67         void send_attach (class session_t *destination_,
68              struct i_engine *engine_, const blob_t &peer_identity_,
69              bool inc_seqnum_ = true);
70         void send_bind (class own_t *destination_,
71              class reader_t *in_pipe_, class writer_t *out_pipe_,
72              const blob_t &peer_identity_, bool inc_seqnum_ = true);
73         void send_activate_reader (class reader_t *destination_);
74         void send_activate_writer (class writer_t *destination_,
75              uint64_t msgs_read_);
76         void send_pipe_term (class writer_t *destination_);
77         void send_pipe_term_ack (class reader_t *destination_);
78         void send_term_req (class own_t *destination_,
79             class own_t *object_);
80         void send_term (class own_t *destination_, int linger_);
81         void send_term_ack (class own_t *destination_);
82         void send_reap (class socket_base_t *socket_);
83         void send_reaped ();
84         void send_done ();
85 
86         //  These handlers can be overloaded by the derived objects. They are
87         //  called when command arrives from another thread.
88         virtual void process_stop ();
89         virtual void process_plug ();
90         virtual void process_own (class own_t *object_);
91         virtual void process_attach (struct i_engine *engine_,
92             const blob_t &peer_identity_);
93         virtual void process_bind (class reader_t *in_pipe_,
94             class writer_t *out_pipe_, const blob_t &peer_identity_);
95         virtual void process_activate_reader ();
96         virtual void process_activate_writer (uint64_t msgs_read_);
97         virtual void process_pipe_term ();
98         virtual void process_pipe_term_ack ();
99         virtual void process_term_req (class own_t *object_);
100         virtual void process_term (int linger_);
101         virtual void process_term_ack ();
102         virtual void process_reap (class socket_base_t *socket_);
103         virtual void process_reaped ();
104 
105         //  Special handler called after a command that requires a seqnum
106         //  was processed. The implementation should catch up with its counter
107         //  of processed commands here.
108         virtual void process_seqnum ();
109 
110     private:
111 
112         //  Context provides access to the global state.
113         class ctx_t *ctx;
114 
115         //  Thread ID of the thread the object belongs to.
116         uint32_t tid;
117 
118         void send_command (command_t &cmd_);
119 
120         object_t (const object_t&);
121         const object_t &operator = (const object_t&);
122     };
123 
124 }
125 
126 #endif
127