1 /*
2     Copyright (c) 2007-2010 iMatix Corporation
3 
4     This file is part of 0MQ.
5 
6     0MQ is free software; you can redistribute it and/or modify it under
7     the terms of the GNU Lesser General Public License as published by
8     the Free Software Foundation; either version 3 of the License, or
9     (at your option) any later version.
10 
11     0MQ is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU Lesser General Public License for more details.
15 
16     You should have received a copy of the GNU Lesser General Public License
17     along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #ifndef __ZMQ_HPP_INCLUDED__
21 #define __ZMQ_HPP_INCLUDED__
22 
23 #include "zmq.h"
24 
25 #include <cassert>
26 #include <cstring>
27 #include <exception>
28 
29 namespace zmq
30 {
31 
32     typedef zmq_free_fn free_fn;
33     typedef zmq_pollitem_t pollitem_t;
34 
35     class error_t : public std::exception
36     {
37     public:
38 
error_t()39         error_t () : errnum (zmq_errno ()) {}
40 
what() const41         virtual const char *what () const throw ()
42         {
43             return zmq_strerror (errnum);
44         }
45 
num() const46         int num () const
47         {
48             return errnum;
49         }
50 
51     private:
52 
53         int errnum;
54     };
55 
poll(zmq_pollitem_t * items_,int nitems_,long timeout_=-1)56     inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
57     {
58         int rc = zmq_poll (items_, nitems_, timeout_);
59         if (rc < 0)
60             throw error_t ();
61         return rc;
62     }
63 
device(int device_,void * insocket_,void * outsocket_)64     inline void device (int device_, void * insocket_, void* outsocket_)
65     {
66         int rc = zmq_device (device_, insocket_, outsocket_);
67         if (rc != 0)
68             throw error_t ();
69     }
70 
71     class message_t : private zmq_msg_t
72     {
73         friend class socket_t;
74 
75     public:
76 
message_t()77         inline message_t ()
78         {
79             int rc = zmq_msg_init (this);
80             if (rc != 0)
81                 throw error_t ();
82         }
83 
message_t(size_t size_)84         inline message_t (size_t size_)
85         {
86             int rc = zmq_msg_init_size (this, size_);
87             if (rc != 0)
88                 throw error_t ();
89         }
90 
message_t(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)91         inline message_t (void *data_, size_t size_, free_fn *ffn_,
92             void *hint_ = NULL)
93         {
94             int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
95             if (rc != 0)
96                 throw error_t ();
97         }
98 
~message_t()99         inline ~message_t ()
100         {
101             int rc = zmq_msg_close (this);
102             assert (rc == 0);
103         }
104 
rebuild()105         inline void rebuild ()
106         {
107             int rc = zmq_msg_close (this);
108             if (rc != 0)
109                 throw error_t ();
110             rc = zmq_msg_init (this);
111             if (rc != 0)
112                 throw error_t ();
113         }
114 
rebuild(size_t size_)115         inline void rebuild (size_t size_)
116         {
117             int rc = zmq_msg_close (this);
118             if (rc != 0)
119                 throw error_t ();
120             rc = zmq_msg_init_size (this, size_);
121             if (rc != 0)
122                 throw error_t ();
123         }
124 
rebuild(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)125         inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
126             void *hint_ = NULL)
127         {
128             int rc = zmq_msg_close (this);
129             if (rc != 0)
130                 throw error_t ();
131             rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
132             if (rc != 0)
133                 throw error_t ();
134         }
135 
move(message_t * msg_)136         inline void move (message_t *msg_)
137         {
138             int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
139             if (rc != 0)
140                 throw error_t ();
141         }
142 
copy(message_t * msg_)143         inline void copy (message_t *msg_)
144         {
145             int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
146             if (rc != 0)
147                 throw error_t ();
148         }
149 
data()150         inline void *data ()
151         {
152             return zmq_msg_data (this);
153         }
154 
size()155         inline size_t size ()
156         {
157             return zmq_msg_size (this);
158         }
159 
160     private:
161 
162         //  Disable implicit message copying, so that users won't use shared
163         //  messages (less efficient) without being aware of the fact.
164         message_t (const message_t&);
165         void operator = (const message_t&);
166     };
167 
168     class context_t
169     {
170         friend class socket_t;
171 
172     public:
173 
context_t(int io_threads_)174         inline context_t (int io_threads_)
175         {
176             ptr = zmq_init (io_threads_);
177             if (ptr == NULL)
178                 throw error_t ();
179         }
180 
~context_t()181         inline ~context_t ()
182         {
183             int rc = zmq_term (ptr);
184             assert (rc == 0);
185         }
186 
187         //  Be careful with this, it's probably only useful for
188         //  using the C api together with an existing C++ api.
189         //  Normally you should never need to use this.
operator void*()190         inline operator void* ()
191         {
192             return ptr;
193         }
194 
195     private:
196 
197         void *ptr;
198 
199         context_t (const context_t&);
200         void operator = (const context_t&);
201     };
202 
203     class socket_t
204     {
205     public:
206 
socket_t(context_t & context_,int type_)207         inline socket_t (context_t &context_, int type_)
208         {
209             ptr = zmq_socket (context_.ptr, type_);
210             if (ptr == NULL)
211                 throw error_t ();
212         }
213 
~socket_t()214         inline ~socket_t ()
215         {
216             int rc = zmq_close (ptr);
217             assert (rc == 0);
218         }
219 
operator void*()220         inline operator void* ()
221         {
222             return ptr;
223         }
224 
setsockopt(int option_,const void * optval_,size_t optvallen_)225         inline void setsockopt (int option_, const void *optval_,
226             size_t optvallen_)
227         {
228             int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
229             if (rc != 0)
230                 throw error_t ();
231         }
232 
getsockopt(int option_,void * optval_,size_t * optvallen_)233         inline void getsockopt (int option_, void *optval_,
234             size_t *optvallen_)
235         {
236             int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
237             if (rc != 0)
238                 throw error_t ();
239         }
240 
bind(const char * addr_)241         inline void bind (const char *addr_)
242         {
243             int rc = zmq_bind (ptr, addr_);
244             if (rc != 0)
245                 throw error_t ();
246         }
247 
connect(const char * addr_)248         inline void connect (const char *addr_)
249         {
250             int rc = zmq_connect (ptr, addr_);
251             if (rc != 0)
252                 throw error_t ();
253         }
254 
send(message_t & msg_,int flags_=0)255         inline bool send (message_t &msg_, int flags_ = 0)
256         {
257             int rc = zmq_send (ptr, &msg_, flags_);
258             if (rc == 0)
259                 return true;
260             if (rc == -1 && zmq_errno () == EAGAIN)
261                 return false;
262             throw error_t ();
263         }
264 
recv(message_t * msg_,int flags_=0)265         inline bool recv (message_t *msg_, int flags_ = 0)
266         {
267             int rc = zmq_recv (ptr, msg_, flags_);
268             if (rc == 0)
269                 return true;
270             if (rc == -1 && zmq_errno () == EAGAIN)
271                 return false;
272             throw error_t ();
273         }
274 
275     private:
276 
277         void *ptr;
278 
279         socket_t (const socket_t&);
280         void operator = (const socket_t&);
281     };
282 
283 }
284 
285 #endif
286