1 /*
2     Copyright (c) 2007-2011 iMatix Corporation
3     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
4 
5     This file is part of 0MQ.
6 
7     0MQ is free software; you can redistribute it and/or modify it under
8     the terms of the GNU Lesser General Public License as published by
9     the Free Software Foundation; either version 3 of the License, or
10     (at your option) any later version.
11 
12     0MQ is distributed in the hope that it will be useful,
13     but WITHOUT ANY WARRANTY; without even the implied warranty of
14     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15     GNU Lesser General Public License for more details.
16 
17     You should have received a copy of the GNU Lesser General Public License
18     along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #ifndef __ZMQ_HPP_INCLUDED__
22 #define __ZMQ_HPP_INCLUDED__
23 
24 #include "zmq.h"
25 
26 #include <algorithm>
27 #include <cassert>
28 #include <cstring>
29 #include <exception>
30 
31 // Detect whether the compiler supports C++11 rvalue references.
32 #if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && defined(__GXX_EXPERIMENTAL_CXX0X__))
33 #   define ZMQ_HAS_RVALUE_REFS
34 #endif
35 #if (defined(__clang__))
36 #   if __has_feature(cxx_rvalue_references)
37 #       define ZMQ_HAS_RVALUE_REFS
38 #   endif
39 #endif
40 #if (defined(_MSC_VER) && (_MSC_VER >= 1600))
41 #   define ZMQ_HAS_RVALUE_REFS
42 #endif
43 
44 // In order to prevent unused variable warnings when building in non-debug
45 // mode use this macro to make assertions.
46 #ifndef NDEBUG
47 #   define ZMQ_ASSERT(expression) assert(expression)
48 #else
49 #   define ZMQ_ASSERT(expression) (expression)
50 #endif
51 
52 namespace zmq
53 {
54 
55     typedef zmq_free_fn free_fn;
56     typedef zmq_pollitem_t pollitem_t;
57 
58     class error_t : public std::exception
59     {
60     public:
61 
error_t()62         error_t () : errnum (zmq_errno ()) {}
63 
what() const64         virtual const char *what () const throw ()
65         {
66             return zmq_strerror (errnum);
67         }
68 
num() const69         int num () const
70         {
71             return errnum;
72         }
73 
74     private:
75 
76         int errnum;
77     };
78 
poll(zmq_pollitem_t * items_,int nitems_,long timeout_=-1)79     inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
80     {
81         int rc = zmq_poll (items_, nitems_, timeout_);
82         if (rc < 0)
83             throw error_t ();
84         return rc;
85     }
86 
device(int device_,void * insocket_,void * outsocket_)87     inline void device (int device_, void * insocket_, void* outsocket_)
88     {
89         int rc = zmq_device (device_, insocket_, outsocket_);
90         if (rc != 0)
91             throw error_t ();
92     }
93 
version(int * major_,int * minor_,int * patch_)94     inline void version (int *major_, int *minor_, int *patch_)
95     {
96         zmq_version (major_, minor_, patch_);
97     }
98 
99     class message_t : private zmq_msg_t
100     {
101         friend class socket_t;
102 
103     public:
104 
message_t()105         inline message_t ()
106         {
107             int rc = zmq_msg_init (this);
108             if (rc != 0)
109                 throw error_t ();
110         }
111 
message_t(size_t size_)112         inline message_t (size_t size_)
113         {
114             int rc = zmq_msg_init_size (this, size_);
115             if (rc != 0)
116                 throw error_t ();
117         }
118 
message_t(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)119         inline message_t (void *data_, size_t size_, free_fn *ffn_,
120             void *hint_ = NULL)
121         {
122             int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
123             if (rc != 0)
124                 throw error_t ();
125         }
126 
~message_t()127         inline ~message_t ()
128         {
129             int rc = zmq_msg_close (this);
130             ZMQ_ASSERT (rc == 0);
131         }
132 
rebuild()133         inline void rebuild ()
134         {
135             int rc = zmq_msg_close (this);
136             if (rc != 0)
137                 throw error_t ();
138             rc = zmq_msg_init (this);
139             if (rc != 0)
140                 throw error_t ();
141         }
142 
rebuild(size_t size_)143         inline void rebuild (size_t size_)
144         {
145             int rc = zmq_msg_close (this);
146             if (rc != 0)
147                 throw error_t ();
148             rc = zmq_msg_init_size (this, size_);
149             if (rc != 0)
150                 throw error_t ();
151         }
152 
rebuild(void * data_,size_t size_,free_fn * ffn_,void * hint_=NULL)153         inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
154             void *hint_ = NULL)
155         {
156             int rc = zmq_msg_close (this);
157             if (rc != 0)
158                 throw error_t ();
159             rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
160             if (rc != 0)
161                 throw error_t ();
162         }
163 
move(message_t * msg_)164         inline void move (message_t *msg_)
165         {
166             int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
167             if (rc != 0)
168                 throw error_t ();
169         }
170 
copy(message_t * msg_)171         inline void copy (message_t *msg_)
172         {
173             int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
174             if (rc != 0)
175                 throw error_t ();
176         }
177 
data()178         inline void *data ()
179         {
180             return zmq_msg_data (this);
181         }
182 
size()183         inline size_t size ()
184         {
185             return zmq_msg_size (this);
186         }
187 
188     private:
189 
190         //  Disable implicit message copying, so that users won't use shared
191         //  messages (less efficient) without being aware of the fact.
192         message_t (const message_t&);
193         void operator = (const message_t&);
194     };
195 
196     class context_t
197     {
198         friend class socket_t;
199 
200     public:
201 
context_t(int io_threads_)202         inline context_t (int io_threads_)
203         {
204             ptr = zmq_init (io_threads_);
205             if (ptr == NULL)
206                 throw error_t ();
207         }
208 
209 #ifdef ZMQ_HAS_RVALUE_REFS
context_t(context_t && rhs)210         inline context_t(context_t&& rhs) : ptr(rhs.ptr)
211         {
212             rhs.ptr = NULL;
213         }
operator =(context_t && rhs)214         inline context_t& operator=(context_t&& rhs)
215         {
216             std::swap(ptr, rhs.ptr);
217             return *this;
218         }
219 #endif
220 
~context_t()221         inline ~context_t ()
222         {
223             if (ptr == NULL)
224                 return;
225             int rc = zmq_term (ptr);
226             ZMQ_ASSERT (rc == 0);
227         }
228 
229         //  Be careful with this, it's probably only useful for
230         //  using the C api together with an existing C++ api.
231         //  Normally you should never need to use this.
operator void*()232         inline operator void* ()
233         {
234             return ptr;
235         }
236 
237     private:
238 
239         void *ptr;
240 
241         context_t (const context_t&);
242         void operator = (const context_t&);
243     };
244 
245     class socket_t
246     {
247     public:
248 
socket_t(context_t & context_,int type_)249         inline socket_t (context_t &context_, int type_)
250         {
251             ptr = zmq_socket (context_.ptr, type_);
252             if (ptr == NULL)
253                 throw error_t ();
254         }
255 
256 #ifdef ZMQ_HAS_RVALUE_REFS
socket_t(socket_t && rhs)257         inline socket_t(socket_t&& rhs) : ptr(rhs.ptr)
258         {
259             rhs.ptr = NULL;
260         }
operator =(socket_t && rhs)261         inline socket_t& operator=(socket_t&& rhs)
262         {
263             std::swap(ptr, rhs.ptr);
264             return *this;
265         }
266 #endif
267 
~socket_t()268         inline ~socket_t ()
269         {
270             close();
271         }
272 
operator void*()273         inline operator void* ()
274         {
275             return ptr;
276         }
277 
close()278         inline void close()
279         {
280             if(ptr == NULL)
281                 // already closed
282                 return ;
283             int rc = zmq_close (ptr);
284             if (rc != 0)
285                 throw error_t ();
286             ptr = 0 ;
287         }
288 
setsockopt(int option_,const void * optval_,size_t optvallen_)289         inline void setsockopt (int option_, const void *optval_,
290             size_t optvallen_)
291         {
292             int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
293             if (rc != 0)
294                 throw error_t ();
295         }
296 
getsockopt(int option_,void * optval_,size_t * optvallen_)297         inline void getsockopt (int option_, void *optval_,
298             size_t *optvallen_)
299         {
300             int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
301             if (rc != 0)
302                 throw error_t ();
303         }
304 
bind(const char * addr_)305         inline void bind (const char *addr_)
306         {
307             int rc = zmq_bind (ptr, addr_);
308             if (rc != 0)
309                 throw error_t ();
310         }
311 
connect(const char * addr_)312         inline void connect (const char *addr_)
313         {
314             int rc = zmq_connect (ptr, addr_);
315             if (rc != 0)
316                 throw error_t ();
317         }
318 
send(message_t & msg_,int flags_=0)319         inline bool send (message_t &msg_, int flags_ = 0)
320         {
321             int rc = zmq_send (ptr, &msg_, flags_);
322             if (rc == 0)
323                 return true;
324             if (rc == -1 && zmq_errno () == EAGAIN)
325                 return false;
326             throw error_t ();
327         }
328 
recv(message_t * msg_,int flags_=0)329         inline bool recv (message_t *msg_, int flags_ = 0)
330         {
331             int rc = zmq_recv (ptr, msg_, flags_);
332             if (rc == 0)
333                 return true;
334             if (rc == -1 && zmq_errno () == EAGAIN)
335                 return false;
336             throw error_t ();
337         }
338 
339     private:
340 
341         void *ptr;
342 
343         socket_t (const socket_t&);
344         void operator = (const socket_t&);
345     };
346 
347 }
348 
349 #endif
350